How does aggregate with different task_id in one filter part works

Could anyone help explain how the aggregate with different task_id in one filter part works? Will the second aggregate walk through again on all existed events? Or any memory limit setting when perform the aggregate?

Take below config as example, I want first aggregate on "InID", walk through all events then this task end. after that, go into the second aggregate, this time the task_id changed to "EvenID", I suppose it will walk through all existed events again (include the aggregate events from the first aggregate).

Already set pipeline.workers=1 and java_execution = false, and I am using logstash 7.8.

Below is my logstash config -

input {
   file {
        path => "/apps/logstash/rawData/logData.txt"
        file_sort_by => "last_modified"
        mode => "read"
        start_position => "beginning"
   }
}
filter {
      grok {
      match => { "message" => "Fixinbound\s+%{TIME:TIME1}\s+.*\s+%{NOTSPACE:InID}" }
match => { "message" => "outbound\s+%{TIME:TIME2} %{NOTSPACE:EvenID} %{NOTSPACE:InID}" }
match => { "message" => "bridge_in\s+%{TIME:TIME3} %{NOTSPACE:EvenID} %{NOTSPACE:OutID}" }

mutate { 
    remove_field =>{ "@version", "host", "path", "@timestamp" }
 }

######## 1st round aggregate ######## 

if [message] =~ /^Fixinbound */ {
  aggregate {
       task_id => "%{InID}"
       code => "map['TIME1'] ||= event.get('TIME1');"
       map_action => "create"
}
drop {}
}
if [message] =~ /^outbound */ {
  aggregate {
       task_id => "%{InID}"
       code => "event.set('TIME1', map['TIME1']);"
       map_action => "update"
       end_of_task => true
       timeout => 1800
}
}

######## 2nd round aggregate ######## 

if [message] =~ /^bridge_in */ {
  aggregate {
       task_id => "%{EvenID}"
       code => "map['TIME3'] ||= event.get('TIME3'); map['OutID'] ||= event.get('OutID');"
       map_action => "create"
}
drop {}
}
if [message] =~ /^outbound */ {
  aggregate {
       task_id => "%{EvenID}"
       code => "event.set('TIME3', map['TIME3']);event.set('OutID', map['OutID']);"
       map_action => "update"
       end_of_task => true
       timeout => 1800
}
}
}

output {
     csv { 
         fields => [ "TIME1", "TIME2", "TIME3", "InID", "OutID" ]
         path => "/apps/logstash/result.csv"
}
}

My input file looks like as below -

Fixinbound t11 10 5367
Fixinbound t12 11 5368
Fixinbound t13 12 5369
Fixinbound t14 13 5370
... (N lines "Fixinbound*")
outbound T21 2230 5367
outbound T22 2231 5368
outbound T23 2232 5369
outbound T24 2233 5370
... (N lines "outbound*")
bridge_in t31 2230 BL000
bridge_in t32 2231 BL111
bridge_in t33 2232 BL222
bridge_in t34 2233 BL333
... (N lines "bridge_in*")

I tried on input with different amount. When N<=30, the output is perfect. However, when 30<N<100, the output showed only around 25 records can be combined successfully, others cannot be matched and only output TIME2 and those two ids ( the items that the "outbound" message has). I also tried N=1000 & N=3000, the output is totally no events can be aggregated together.

Please advise. Thank you in advanced.

I think I found out the solution -
Increase the pipeline.batch.size in pipeline.yml Or use the flag -b $NUM when launch the logstash config.
Default pipeline.batch.size =125 suppose this is not enough for my case to handle thousands of events at one time. I knew it will lost some efficiency but this is the only way to solve my case I knew so far.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.