Aggregation Rule

Hi there. I have plenty of logs on my logstash, and they look like this:

Mar 17 10:43:04 xfirewall CEF:0|infotecs|xf|5.4|62|Non-encrypted forwarded IP packet passed|5|start=1742190120000 end=1742190120000 src=**** dst=**** spt=42202 dpt=4506 proto=tcp cnt=1 out=52 deviceOutboundInterface=eth9.2069 cn1=800h cn1Label=Eth protocol hex cs1=NAT no cs2=Drop no cs3=Broadcast no cs4=Forward yes cs5=Encrypted no cs1Label=Flags cs6=unknown cs6Label=Application name deviceDirection=outbound app=TCP suser= deviceExternalId=05bf2548

They all have "xfirewall" label in the beginning of a message.
I need to aggregate such logs into one, if they have the same "src" and "dst" values (src1 = src2, dst1 = dst2). I already have a rule, but it doesn't work. Here it is:

filter{
   grok{ 
      match => { "message" => "%{MONTH} %{MONTHDAY} %{TIME} %{WORD:xf} %{WORD:cef}:0\|%{WORD}\|%{WORD}\|.{1,3}\|.+src=%{IPV4:src} dst=%{IPV4:dst}.+proto=%{URIPROTO:proto}.+cs2=%{WORD:state}.+cs4=%{WORD:state2}" }
   }
   aggregate{ 
      task_id => "%{[src]}+%{[dst]}" 
      code => "
        map['count'] ||= 0
        map['count'] += 1
        event.set('aggregated_count', map['count'])
      "
      push_map_as_event_on_timeout => true
      timeout_task_id_field => "task_id"
      timeout => 10
   }

}

Can anyone tell me what am i doing wrong? The grok parser works as it should work, by the way. This is the result of parsing:

(ignore this "null")

So, i accept any suggestions on what should i do to fix this and am thankful for any help.

Please show us one of the events that you are trying to aggregate (does it have the [aggregated_count] field) and one of the events that the aggregate filter generates when the timeout occurs.

It doesn't have the [aggregated_count] field originally; the example of the event you can see just in the beginning of my post.

I don't really know how to see the aggregate (filtered) events; i tried using tcpdump, but the events are just the same. The thing is, i send these events to SIEM, and it comes to SIEM just the same. So i think the rule doesn't work.