GROK filter cannot be applied on aggregated events

Hi,

All is in the title; here is my Aggregate plugin configuration:

    aggregate {
            task_id => "%{id}"
            code => "map['message'] ||= '' ; map['message'] += event['message'] + '\n'"
            add_tag => ['start']
            }

    if [message] =~ "(?:( tunnel .* started\.)|(Access policy: Deny))" {
    aggregate {
            task_id => "%{id}"
            code => "event['message'] = map['message']"
            end_of_task => true
            add_tag => ['end']
            }
    }

Thanks to the configuration above, my events are tied up as wanted, this is a good point. In the other hand, when I make a search using grok fields, it works fine when the search matches the low level events (the none aggregated ones). But, something goes wrong when the filter I type in KIBANA search bar should match the aggregated events, as if those events are not related at all, so search result is displayed.

A search including only the expressions I am looking for (without using grok fields) works fine though.

This is really annoying, I've been looking for a solution for weeks now, but alas, found nothing that could explain this.

Does it ring a bell to anyone?

You provided info on what you are doing in logstash, but my reading is that you are having an issue searching for an "aggregate" field in Kibana. Could you provide further details on your issue searching?

Actually my events are grokked before being aggregated. Since one syslog event is split into several lines before it is sent to Logstash, I have to tie up those lines to find my way around and recover the whole event before it is fractioned.

Grok filter works fine for the low level events (non gathered syslog lines), and the aggregate filter works fine too, as I could gather related lines into one single event. But once the gathering is made by Aggregate filter, I cannot use the GROK fields for search anymore.

Suppose I have these two lines tied up to form one single event:

Feb 16 17:29:04 slot1/LTM notice apd[5515]: 01490010:5: 1ec2b273: Username 'cjones'
Feb 16 17:29:04 slot1/LTM warning apd[5515]: 01490106:4: 1ec2b273: AD module: authentication with 'cjones' failed: Preauthentication failed, principal name: cjones@GEEKO.COM. Invalid user credentials.  (-1765328360)

And suppose these lines were grokked before being aggregated; "username" : cjones field for line 1, and "email ": cjones@GEEKO.COM.

When I perform the following search : username : cjones AND email : cjones@GEEKO.COM I have nothing out of it. But when I perform this: "cjones" AND "cjones@GEEKO.COM" it works ok.

When I perform this: username : cjones, I get a result from the low level event only (the document containing line 1), and this is a problem for me as well, because, I need my results to be fetched from the documents created after the aggregation, and not from the low level events. I actually want the low level event documents to desappear completely after the Aggregate process. This doesn't seem to be the case on my platform.

Conclusion: When the events are tied up, searches that use Grok fields cannot be applied.

When you search in kibana without defining a specific field, it searches the _all field, which is a special field that all indexed strings are put into. Most likely, your search is hitting on the "message" field.

Could you run the following query, and post the results.

GET _search
{
  "size": 2, 
  "query": {
    "bool": {
      "filter": {
        "query_string": {
          "query": "\"cjones\" AND \"cjones@GEEKO.COM\""
        }
      }
    }
  }
}

Hi, thanks for answering.

Here is what I get after typing the query:

curl -XGET 'http://ELKserver/logstash-*/_search?pretty'
-d '{
{
  "size": 2, 
  "query": {
  "bool": {
  "filter": {
    "query_string": {
      "query": "\"cjones\" AND \"cjones@GEEKO.COM\""
         }
      }
    }
  }
}

{

"took" : 1831,
"timed_out" : false,  
"_shards" : {  
"total" : 140,    
"successful" : 140,   
"failed" : 0  
},

"hits" : {    
"total" : 1,  
"max_score" : 0.0,    
"hits" : [ {   
"_index" : "logstash-2016.07.12",      
"_type" : "logstash",  
"_id" : "AVXgPrdV2kEjcao1vk8K",   
"_score" : 0.0,

     
"_source":{"message":"<14>Jul 12 19:54:13
CiscoMXTextMailLogs_Logstash: Info: MID 8888888 ICID 9999999 RID 0 To: <cjones@GEEKO.com>","@version":"1","@timestamp":"2016-07-12T17:54:13.566Z","type":"logstash","host":"189.18.16.89","tags":["_grokparsefailure_sysloginput","grokked_syslog
"],"priority":0,"severity":0,"facility":0,"facility_label":"kernel","severity_label":["Emergency","Info"],"received_at":"2016-07-12T17:54:13.566Z","hostname":"hostname","syslog_severity_code":5,"syslog_facility_code":1,"syslog_facility":"user-level","syslog_severity":"notice","syslog_pri":"14","info":"MID
8888888 ICID 9999999 RID 0 To: < cjones@GEEKO.com >"}

} ]  
}
}

The username and email fields do not exist in that document. I can see that it has a _grokparsefailure_sysloginput and a grokked_syslog\n tag. It seems that there is an issue in the way that you are processing these messages in logstash.

An additional observation, the search for cjsones and cjones@GEEKO.com are hitting on the same cjones@GEEKO.com string, since I do not see an individual instance of just <space>cjsones<space> anywhere in the returned document.

{
  "_index": "logstash-2016.07.12",
  "_type": "logstash",
  "_id": "AVXgPrdV2kEjcao1vk8K",
  "_score": 0,
  "_source": {
    "message": "<14>Jul 12 19:54:13\nCiscoMXTextMailLogs_Logstash: Info: MID 8888888 ICID 9999999 RID 0 To: <cjones@GEEKO.com>",
    "@version": "1",
    "@timestamp": "2016-07-12T17:54:13.566Z",
    "type": "logstash",
    "host": "189.18.16.89",
    "tags": [
      "_grokparsefailure_sysloginput",
      "grokked_syslog\n"
    ],
    "priority": 0,
    "severity": 0,
    "facility": 0,
    "facility_label": "kernel",
    "severity_label": [
      "Emergency",
      "Info"
    ],
    "received_at": "2016-07-12T17:54:13.566Z",
    "hostname": "hostname",
    "syslog_severity_code": 5,
    "syslog_facility_code": 1,
    "syslog_facility": "user-level",
    "syslog_severity": "notice",
    "syslog_pri": "14",
    "info": "MID\n8888888 ICID 9999999 RID 0 To: < cjones@GEEKO.com >"
  }
}

Sorry, you answered before I could update the post and warn you that the message returned is actually related to another document (other equipment), and not to the one that is supposed to be aggregated. Which means the aggregated logs return nothing actually.

Do you want to run the query on the logs processed by logstash, or only on the two lines above? Is there a way to run the query on a specified log file?

I tried to run the query on other strings I was sure I could get an answer for. I am answered with aggregated non grokked documents. Once Aggregation is done, am I supposed to GROK again?

I am confused of your problem at this point. Here is a simple example, hopefully this helps.

Logstash config

input {
  stdin { }
}

filter {
  grok {
    match => { 'message' => '%{NUMBER:id}, %{DATA:field1}, %{GREEDYDATA:field2}' }
  }

  if [id] {
    aggregate {
      task_id => "%{id}"
      code => "map['message'] ||= '' ; map['message'] += event['message'] + '\n'"
      add_tag => ['start']
    }

    if [message] =~ "end" {
      aggregate {
        task_id => "%{id}"
        code => "event['message'] = map['message']"
        end_of_task => true
        add_tag => ['end']
      }
    }
  }
}

output {
  stdout {
    codec => rubydebug { metadata => true }
  }
}

bash

$ bin/logstash -f ../logstash-stdin.config
Settings: Default pipeline workers: 4
Pipeline main started
1, start, foo1
{
       "message" => "1, start, foo1",
      "@version" => "1",
    "@timestamp" => "2016-07-23T16:04:56.661Z",
          "host" => "hostname1",
            "id" => "1",
        "field1" => "start",
        "field2" => "foo1",
          "tags" => [
        [0] "start"
    ]
}
1, end, foo2
{
       "message" => "1, start, foo1\\n1, end, foo2\\n",
      "@version" => "1",
    "@timestamp" => "2016-07-23T16:05:03.699Z",
          "host" => "hostname1",
            "id" => "1",
        "field1" => "end",
        "field2" => "foo2",
          "tags" => [
        [0] "start",
        [1] "end"
    ]
}

Notice that field1 and field2 contain values from the second message (1, end, foo2), not the first message. You could use the code function to control how these fields are mapped in either the start, update (not used here), or end aggregations.

I am going to post, once and for all my whole configuration, so that you could perhaps tell me whar's wrong with it. Because my GROK fields are still not here when data is aggregated

        grok {
                break_on_match => false
                patterns_dir => "/path/to/pattern"
                match => [ "message", "<%{INT:syslog_pri}>%{SYSLOGTIMESTAMP} slot1\/%{HOSTNAME:logsource} %{LOGLEVEL:severity_label} %{SYSLOGPROG}: %{SEQ:sequence}: %{ID:id}: %{GREEDYDATA:info}" ]
                add_tag => [ "grokked_syslog_apm" ]
                tag_on_failure => [ "ERROR : grok matching failed" ]

                }

             if [program] == "apd" {
              grok {
                patterns_dir => "/path/to/pattern"
                break_on_match => false
                match => [

                "info", "User chose option: %{NUMBER:client_option}",
                "info", "Username '%{USERNAME:username}'",
                "info", "Access policy result: %{GREEDYDATA:policy_result}"
                                ]

                 match => [ "info", "%{GREEDYDATA:info}" ]
                 remove_tag => "grokked_syslog"
                 add_tag => "grokked_syslog_apd"
                 overwrite => [ "info" ]
                    }
          }

    
        else {

            grok {
                 patterns_dir => "/path/to/pattern"
                 break_on_match => false
                 match => [
                  "info" , "User-Agent header is: %{GREEDYDATA:UA_reason}",
                  "info" , "Received User-Agent header: %{GREEDYDATA:UA}",
                  "info" , "PPP tunnel %{BASE16NUM:tunnel_id} (started|closed)."
              ]
              match => [ "info", "%{GREEDYDATA:info}" ]
              overwrite => [ "info" ]
            }
        }
         geoip {
                 source => "clientip"
                 target => "geoip"
                 database => "/pathtodatabase/database_file.dat"
                 add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                 add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
                 }
         mutate {
                 convert => [ "[geoip][coordinates]", "float"]
                 }


        if [id]{
        aggregate {
                task_id => "%{id}"
                code => "map['message'] ||= '' ; map['message'] += event['message'] + '\n'"
                add_tag => ['start']
                }

        if [message] =~ "(?:(PPP tunnel .* started\.)|(Access policy result: Logon_Deny))" {
        aggregate {
                task_id => "%{id}"
                code => "event['message'] = map['message']"
                end_of_task => true
                add_tag => ['end']
                }
        }
    }

Do you see something wrong.
Sorry, I can't run a debug output, I don't have enough rights on the output file.

I have the feeling GROK filter has a problem. My KIBANA statistics seems to be distorted due to the fact that GRK filter creates a document with gathered lines, in the addition of the low level document.

Thus, when I look for a username for instance, on KIBANA, the field is counted twice, knowing that it is contained in both low level documents and GROK ones.

Hey there, me again

I really can't get it work, I've tried everything but in vain..
Someone could tell me what's wrong with my configuration?

Thanks again