Multiple aggregate filter with same task_id pattern doesn't share map

Here are events:

Id: 0, Source: a, Status: 0
Id: 1, Source: a, Status: 1
Id: 2, Source: a, Status: 0
Id: 3, Source: a, Status: 1
Id: 4, Source: a, Status: 1
Id: 5, Source: a, Status: 0
Id: 6, Source: a, Status: 1
Id: 7, Source: a, Status: 1
Id: 8, Source: a, Status: 1

The mission is to output events with status 1, only the first one in its sequence. So here I expect events with ids: 1, 3 and 6

I'm using the Aggregate filter (as didn't find anything better for this task)

if [status] == 1 {
   aggregate {
	
       task_id => "%{source}"
       code => "
       map['count'] ||= 0
       map['count'] += 1
	
       if (map['count'] > 1)
       # after everything will work, I will uncomment this line
       #	event.cancel()
       end

       event.set('count', map['count'])
       "
   }
}

if [status] == 0 {
   aggregate {
       task_id => "%{source}"
       end_of_task => true
       code => "
         map['count'] = 0
         event.set('count', map['count'])
         # after everything will work, I will uncomment this line
         #	event.cancel()
       "
   }
}

Output :

{ "id" => 0, "count" => 0, "source" => "a", "status" => 0 }
{ "id" => 1, "count" => 1, "source" => "a", "status" => 1 }
{ "id" => 2, "count" => 0, "source" => "a", "status" => 0 }
{ "id" => 3, "count" => 2, "source" => "a", "status" => 1 }
{ "id" => 4, "count" => 3, "source" => "a", "status" => 1 }
{ "id" => 5, "count" => 0, "source" => "a", "status" => 0 }
{ "id" => 6, "count" => 4, "source" => "a", "status" => 1 }
{ "id" => 7, "count" => 5, "source" => "a", "status" => 1 }
{ "id" => 8, "count" => 6, "source" => "a", "status" => 1 }

As you see the count field isn't reset on status=0 event.
Note that I run Logstash with "-w 1" to make sure there is single worker here

The problem is that the pipeline process events in batches, so multiple events go through the first filter before anything goes through the second filter.

Try '--pipeline.batch.size 1'. It will fix your problem, but it is less efficient.

Thanks, I see this indeed working now.
Does this mean that the "--pipeline.batch.size 1" is absolutely required by the aggregate filter? I didn't see it in any documentations

No, it is not normally required, it is a peculiarity of the particular code blocks you have. I have never seen an aggregate configuration before that required something in the second filter code block to be visible to the first filter code block.

This is my first aggregation filter. This article very helped me to get how things work. Suspect this setting should be there as well.
Anyway, thanks you help

P.S.
Found another way to make things work (with single aggregate filter):

aggregate {
    task_id => "%{source}"
    code => "
    map['count'] ||= 0
    map['count'] += 1

    if (event.get('status') == 0)
        map.clear()
        event.cancel()
    else
        if (map['count'] > 1)
            event.cancel()
        end
    end

    event.set('count', map['count'])
    "
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.