Aggregate filter plugin

Hi, i create my aggregate filter plugin but it doesn't start with this error :
[ERROR][logstash.filters.aggregate] Missing a required setting for the aggregate filter plugin:

filter {
aggregate {
code => # SETTING MISSING
...
}
}

this is my filter :

filter {
  if [type] == "four-application_log" {
    grok {
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlcount}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlcount_param}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[%{WORD:}][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+%{DATA:}:[\s]+%{BASE10NUM:sqlcount_time}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlfetch}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlfetch_param}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[%{WORD:}][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+%{DATA:}:[\s]+%{BASE10NUM:sqlfetch_time}" ]
    }

   if [sqlcount] {
     aggregate {
       task_id => "%{correlation_id}"
       code => "map['execution_time_temp'] = 0"
       map_action => "create"
     }
   }

   if [sqlcount_param] {
     aggregate {
       task_id => "%{correlation_id}"
       map_action => "update"
     }
   }

   if [sqlcount_time] {
     aggregate {
       task_id => "%{correlation_id}"
       code => "map['execution_time_temp'] += event.get('sqlcount_time')"
       map_action => "update"
     }
   }

   if [sqlfetch] {
     aggregate {
       task_id => "%{correlation_id}"
       map_action => "update"
     }
   }

   if [sqlfetch_param] {
     aggregate {
       task_id => "%{correlation_id}"
      map_action => "update"
     }
   }

   if [sqlfetch_time] {
     aggregate {
       task_id => "%{correlation_id}"
       code => "map['execution_time_temp'] += event.get('sqlfetch_time')"
       map_action => "update"
     }
   }

   if [sqlfetch_time] {
     aggregate {
       task_id => "%{correlation_id]"
       code => "event.set('execution_time', map['execution_time_temp'])"
       map_action => "update"
       end_of_task => true
       timeout => 120
     }
   }
   if ![correlation_id] {
     drop { }
   }
  }
}

Please i need help!!

Several of your aggregate filters do not have a code option. For example

if [sqlfetch_param] {
    aggregate {
        task_id => "%{correlation_id}"
        map_action => "update"
    }
}

What do you expect that to do?

Hi,

As documented below, all your aggregate filters need the required code configuration option.

[] 09:38:24,510 INFO  [RicercaRichiestaManagerBean][0159351239239862] [COUNT] [SQL] SELECT COUNT(ID) FROM RichiestaLight r  WHERE  r.idRichiesta =:id 
[] 09:38:24,521 INFO  [RicercaRichiestaManagerBean][0159351239239862] [COUNT] [PARAM] key: id value: 000052192988
[] 09:38:24,790 INFO  [RicercaRichiestaManagerBean][RICERCA] [0159351239239862] [COUNT]  execution time: 0.269 seconds
[] 09:38:24,825 INFO  [RicercaRichiestaManagerBean][0159351239239862] [FETCH] [SQL] SELECT r FROM RichiestaLight r  WHERE  r.idRichiesta =:id  AND  ROWNUM <= 500 ORDER BY r.dataInserimento DESC
[] 09:38:24,832 INFO  [RicercaRichiestaManagerBean][0159351239239862] [FETCH] [PARAM] key: id value: 000052192988
[] 09:38:25,166 INFO  [RicercaRichiestaManagerBean][RICERCA] [0159351239239862] [FETCH]  execution time: 0.334 seconds

These are my type of log,
i need to aggregate all node with same id "0159351239239862" , i want to have a log like this :

{
  "sqlcount": SELECT COUNT(ID) FROM RichiestaLight r  WHERE  r.idRichiesta =:id 
   "key": "000052192988 ",
    "execution_time": "sum of 2 times (0.269 + 0.334)",
   "level": "INFO",
   "orario": etcc 
}

Your first problem is that none of your grok patterns actually match any of your log lines.

Note that \s is already a character class so you can use \s+ to match multiple spaces. No need to make it [\s]+

Next, if you look at these two patterns

  match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlcount}" ]
  match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[%{WORD:}][\s]+\[%{WORD:}][\s]+%{GREEDYDATA:sqlcount_param}" ]

they are identical. If the first one matches then grok will never try to second, and if the first does not match the second never will. I think the same is true of the two sqlfetch patterns, they will never match because if they were going to match then the sqlcount pattern would already have matched.

I suggest you take of the first (common) part of the log lines with one grok, then match rest of the line against a couple of patterns. Then decide what to do based on what fields get created.

Note that my patterns are anchored to start of line using ^. This makes things faster.

Note also the use of ([%{WORD}]\s+)? to consume the [RICERCA] that sometimes comes before the correlation id.

grok { match => { "message" => "^%{DATA:jcaption_id}\s+%{TIME:orario}\s+%{LOGLEVEL:log_level}\s+\[%{USERNAME:class}\]%{GREEDYDATA:[@metadata][restOfLine]}" } }
grok {
    match => {
        "[@metadata][restOfLine]" => [
            "^(\[%{WORD}\]\s+)?\[%{BASE10NUM:correlation_id}]\s+\[%{WORD:operation}\]\s+\[%{WORD:what}\]\s+%{GREEDYDATA:restOfLine}",
            "^(\[%{WORD}\]\s+)?\[%{BASE10NUM:correlation_id}]\s+\[%{WORD:operation}\]\s+%{GREEDYDATA:sqltime}"
        ]
    }
}
if [what] == "PARAM" {
    grok { match => { "restOfLine" => " %{WORD:key}$" } }
}
if [sqltime] {
    grok { match => { "sqltime" => "%{NUMBER:sqltime:float} seconds$" } overwrite => [ "sqltime" ] }
}
aggregate {
    task_id => "%{correlation_id}"
    code => '
        map["execution_time"] ||= 0
        executionTime = event.get("sqltime")
        if executionTime
            map["execution_time"] += executionTime
        end

        operation = event.get("operation")
        what = event.get("what")
        sql = event.get("restOfLine")
        key = event.get("key")

        if operation == "FETCH" and what == "SQL"
            map["sqlfetch"] = sql
        end
        if operation == "COUNT" and what == "SQL"
            map["sqlcount"] = sql
        end
        if key
            map["key"] = key
        end
    '
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "correlation_id"
    timeout => 5
}

You will need to expand this to add the other fields you want.

I change grok to make them unique, now my grok is :

filter {
  if [type] == "four-application_log" {
    grok {
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[COUNT]\s+\[SQL]+%{GREEDYDATA:sqlcount}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[COUNT]\s+\[PARAM][\s]+%{GREEDYDATA:sqlcount_param}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[RICERCA][\s]+\[?%{USERNAME:correlation_id}][\s]+\[COUNT][\s]+%{DATA:}:[\s]+%{BASE10NUM:sqlcount_time}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[FETCH]\s+\[SQL][\s]+%{GREEDYDATA:sqlfetch}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[?%{USERNAME:correlation_id}][\s]+\[FETCH]\s+\[PARAM][\s]+%{GREEDYDATA:sqlfetch_param}" ]
      match => [ "message", "%{DATA:jcaption_id}\s+%{TIME:orario}[\s]+%{LOGLEVEL:log_level}[\s]+\[%{USERNAME:class}\][\s]+\[RICERCA][\s]+\[?%{USERNAME:correlation_id}][\s]+\[FETCH][\s]+%{DATA:}:[\s]+%{BASE10NUM:sqlfetch_time}" ]
    }

Now with this parameters, how can i aggregate?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.