Hi there. I have plenty of logs on my logstash, and they look like this:
Mar 17 10:43:04 xfirewall CEF:0|infotecs|xf|5.4|62|Non-encrypted forwarded IP packet passed|5|start=1742190120000 end=1742190120000 src=**** dst=**** spt=42202 dpt=4506 proto=tcp cnt=1 out=52 deviceOutboundInterface=eth9.2069 cn1=800h cn1Label=Eth protocol hex cs1=NAT no cs2=Drop no cs3=Broadcast no cs4=Forward yes cs5=Encrypted no cs1Label=Flags cs6=unknown cs6Label=Application name deviceDirection=outbound app=TCP suser= deviceExternalId=05bf2548
They all have "xfirewall" label in the beginning of a message.
I need to aggregate such logs into one, if they have the same "src" and "dst" values (src1 = src2, dst1 = dst2). I already have a rule, but it doesn't work. Here it is:
filter{
grok{
match => { "message" => "%{MONTH} %{MONTHDAY} %{TIME} %{WORD:xf} %{WORD:cef}:0\|%{WORD}\|%{WORD}\|.{1,3}\|.+src=%{IPV4:src} dst=%{IPV4:dst}.+proto=%{URIPROTO:proto}.+cs2=%{WORD:state}.+cs4=%{WORD:state2}" }
}
aggregate{
task_id => "%{[src]}+%{[dst]}"
code => "
map['count'] ||= 0
map['count'] += 1
event.set('aggregated_count', map['count'])
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "task_id"
timeout => 10
}
}
Can anyone tell me what am i doing wrong? The grok parser works as it should work, by the way. This is the result of parsing:
So, i accept any suggestions on what should i do to fix this and am thankful for any help.