Logstash - Filter.Aggregate - push_map_as_event_on_timeout not happening

Hi there,
Thanks in advance for your time

I'm trying to use filter { aggregate { } } while reading/parsing an old log file
Into this log file, every second, there is a block of statistics generated that I'd like to aggregate together. For instance, something like =>

....
2020-07-27 15:46:11.151282698 line 1
2020-07-27 15:46:11.151282207 line 2
2020-07-27 15:46:11.151283514 line 3
2020-07-27 15:46:11.155536588 {keyword} stats of AAA I(1, 2) .. IP(3, 4) ..
2020-07-27 15:46:11.155567522 {keyword} stats of BBB I(5, 6) .. IP(7, 8) ..
2020-07-27 15:46:11.155578544 {keyword} stats of CCC I(9, 10) .. IP(11, 12) ..
2020-07-27 15:46:11.195195669 line 4
.....
2020-07-27 15:46:12.155536588 {keyword} stats of AAA I(45, 46) .. IP(47, 48) ..
2020-07-27 15:46:12.155567522 {keyword} stats of BBB I(49, 50) .. IP(51, 52) ..
.....

(... values are fake and random ....)

I'm first filtering the lines where keyword exists : if [message] =~ / \{keyword\} / {
Then, I extract from those lines creationtime, I_sec, I_tot, IP_sec and IP_tot like :

creationtime .. {action} .. I(I_tot, I_sec) .. IP(IP_tot, IP_sec) ..

...with ['action'] being my keyword

Everything's fine so far, all my documents are successfully inserted into Elasticsearch
But I'm missing the aggregated fields : I_sec_sum, I_tot_sum, IP_sec_sum and IP_tot_sum which are supposed to be the sum of each attribute per second. E.g in my above example, for the second 11, I_sec_sum would be 2+6+10 and IP_tot_sum = 3+7+11

My filter looks like =>

filter {
  if [type] == "mytype" {
    if [message] =~ / \{keyword\} / {
      grok {
        pattern_definitions => {
          "TIME1" => "%{YEAR}-%{MONTHNUM2}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}"
        }
        match => { "message" => "%{TIME1:creationtime} %{GREEDYDATA:to-delete-1} \{%{WORD:action}\} %{GREEDYDATA:to-delete-2} I\(%{INT:I_tot:int}, %{INT:I_sec:int}\) %{GREEDYDATA:to-delete-3} IP\(%{INT:IP_tot:int}, %{INT:IP_sec:int}\) %{GREEDYDATA:to-delete-4}" }
      }
      date {
        locale => "en"
        match => ["creationtime", "YYYY-MM-dd HH:mm:ss.SSSSSSSSS"]
        target => "creationtime"
      }
      aggregate {
        timeout_timestamp_field => "creationtime"
        task_id => "%{action}"
        code => "map['I_sec_sum'] ||= 0; map['I_sec_sum'] += event.get('I_sec');"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "action"
        inactivity_timeout => 0.3
      }
      mutate { remove_field => [ "host", "sequence", "path", "to-delete-1", "to-delete-2", "to-delete-3", "to-delete-4", "message", "qs_id", "action" ] }
    } else {
      drop {}
    }
  }
}

I've started to test only with 'I_sec_sum for now but of course the code is not yet done ...
I actually have no error from logstash logs, and the correct number of line having keyword are inserted in Elasticsearch ... but not the events that I expected to be triggered after my timeout of 0.3 sec ...
Any idea what would be wrong here ? Looks like the aggregate filter does not trigger any document ...

From my understanding, I'd expect the timer to be reset to 0 every time a line with task_id is detected and to trigger the document below when timer reaches inactivity_timeout : { "action": "keyword", "I_sec_sum": 18 }

Thanks !!
Guillaume

Does this help?

Hi @Badger,

Thanks - it must be yes, as it's enabled by default
However, I'm using logstash.version=>7.7.0 and adding pipeline.java_execution: false into logstash.yml doesn't seem to have changed anything ... actually I'm not sure the change has been taken into consideration after restarting logstash ...

Could you please confirm it's the correct way to disable java_execution ?

EDIT
Also tried with command-flag bin/logstash --path.settings /opt/elk/logstash/config/$LOGNAME -f /opt/elk/logstash/config/$LOGNAME/conf.d --java-execution false & but same result ...

Actually, when starting up logstash I have this line
[2020-07-30T14:47:12,994][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.sources"=>["/opt/elk/logstash/config/quod3/conf.d/quod-fh.conf"]}

I'm assuming java_execution should appear in that list too

thanks
Guillaume

Yes, adding that to logstash.yml should do it.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.