Aggregation rule

Hi there!

I have plenty of events that look like that:

Feb 25 15:05:50 xfirewall CEF:0|infotecs|xf|5.4|62|Non-encrypted forwarded IP packet passed|5|start=1740477743000 end=1740477743000 src=*** dst=*** spt=53 dpt=38826 proto=udp cnt=2 in=492 deviceInboundInterface=eth9.2066 cn1=800h cn1Label=Eth protocol hex cs1=NAT no cs2=Drop no cs3=Broadcast no cs4=Forward yes cs5=Encrypted no cs1Label=Flags cs6=unknown cs6Label=Application name deviceDirection=inbound app=unknown suser= deviceExternalId=05bf2548

I need to create an aggregation rule which will aggregate such events by these fields: src, dst, spt, dpt and the one that have "xfirewall" value (don't really know what this field is called)

I tried different things - even used AI, but it won't provide a correct rule. Help, please.

What do you want to aggregate? Do you want to combine all events that have the same src/dst/spt/dpt into a single event?

Yeah, exactly

OK. Your messages appear to be in syslog format with a CEF payload. I'd use two pipelines to process that

input { generator { count => 1 lines => [
'Feb 25 15:05:50 xfirewall CEF:0|infotecs|xf|5.4|62|Non-encrypted forwarded IP packet passed|5|start=1740477743000 end=1740477743000 src=127.4.8.23 dst=127.5.1.82 spt=53 dpt=38826 proto=udp cnt=2 in=492 deviceInboundInterface=eth9.2066 cn1=800h cn1Label=Eth protocol hex cs1=NAT no cs2=Drop no cs3=Broadcast no cs4=Forward yes cs5=Encrypted no cs1Label=Flags cs6=unknown cs6Label=Application name deviceDirection=inbound app=unknown suser= deviceExternalId=05bf2548',
'Feb 25 15:05:51 xfirewall CEF:0|infotecs|xf|5.4|62|Non-encrypted forwarded IP packet passed|5|start=1740477743000 end=1740477743000 src=127.4.8.23 dst=127.5.1.82 spt=53 dpt=38826 proto=udp cnt=2 in=492 deviceInboundInterface=eth9.2066 cn1=800h cn1Label=Eth protocol hex cs1=NAT no cs2=Drop no cs3=Broadcast no cs4=Forward yes cs5=Encrypted no cs1Label=Flags cs6=unknown cs6Label=Application name deviceDirection=inbound app=unknown suser= deviceExternalId=05bf2548',
'Feb 25 15:05:52 xfirewall CEF:0|infotecs|xf|5.4|62|Non-encrypted forwarded IP packet passed|5|start=1740477743000 end=1740477743000 src=127.4.8.22 dst=127.5.1.82 spt=53 dpt=55333 proto=udp cnt=2 in=492 deviceInboundInterface=eth9.2066 cn1=800h cn1Label=Eth protocol hex cs1=NAT no cs2=Drop no cs3=Broadcast no cs4=Forward yes cs5=Encrypted no cs1Label=Flags cs6=unknown cs6Label=Application name deviceDirection=inbound app=unknown suser= deviceExternalId=05bf2548' ] } }
output { tcp { host => "127.2.2.2" port => 5678 codec => line { format => "%{message}" } } }

and connect that to

input { tcp { host => "127.2.2.2" port => 5678 codec => cef { delimiter => "
" ecs_compatibility => "disabled" } } }
output { stdout {} }

filter {
    dissect { mapping => { "syslog" => "%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} %{hostname}" } }
    date { match => [ "[@metadata][ts]", "MMM dd HH:mm:ss", "MMM  d HH:mm:ss" ] }

    aggregate {
        task_id => "%{sourceAddress}+%{sourcePort}+%{destinationAddress}+%{destinationPort}"
        code => '
            map["events"] ||= []
            map["events"] << event.to_hash

            map["startTime"] ||= event.get("@timestamp")
            map["endTime"]     = event.get("@timestamp")

            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout => 6
        timeout_code => '
        '
    }
}

If you do not include ecs_compatibility => "disabled" on the code then the task_id option would be %{[source][ip]}+%{[source][port]}+%{[destination][ip]}+%{[destination][port]}

It's not at all clear to me that an array of events is useful. It may be better to let elasticsearch aggregate over a connection using src/dst ip and port numbers.

For curiosity, what is the reason behind this?

While this may be possible, as Badger demonstred, this may seem as unusual as firewall/network devices normally logs one event per line and this can also be extremely inneficient as you woul need to run logstash with just one worker.

Also, this may lead to some events that have nothing to do to being aggregate, as source ports are reused.

The thing is i work in a SOC, so we basically catch events from almost every device. Recently, our SIEM system have been dealing with a big load due to these events, so to reduce this load we decided to aggregate such logs. In the end, SIEM would process not a ton of events from XFirewall, but way less.

What IP should i put in "host" (in input)? Is it my logstash server's ip? or the device that sends these logs?

127.2.2.2 is an address on the loopback network, just like 127.0.0.1. It will work on every system I am familiar with. It is one of your logstash server's IP addresses.