Logstash - Aggregate Sylsog events based on multiple Fields

I am receiving Syslog event from a CGNAT device, that i am able to parse properly.

My issue is I am facing a large number of logs, as for each difference Source Port (irrelevant Field for me), i i am receiving a Log event, summing to a huge number of Logs.

I have a sample of Log events for my Case:

Sample Logs
January 15th 2019, 14:00:21.153 "LSN_ADD""""TCP""""","443""1547553621152"
January 15th 2019, 14:00:20.290 "LSN_ADD""""TCP""""","443""1547553620289"
January 15th 2019, 14:00:08.567 "LSN_ADD""""TCP""""","443""1547553608567"

(The IP Addresses are changed for confidentiality)

The Logs are being successfully parsed using Grok to the following fields,

@timestamp, deviceAddress, Action, sourceIP, sourceport, protocol, NattedSourceIP, nattedsourcePort, destinationIP, destinationPort, SessionID

The 3 events have common fields of interest within a short range of time:
sourceIP, protocol, NattedSourceIP, destinationIP, destinationPort

Can i use the Aggregate plugin in my case to receive a single event with the following format:
@timestamp (either first or last), sourceIP, protocol, NattedSourceIP, destinationIP, destinationPort, sourcePortCount(=3)

I was trying the following aggregate rule, but it's not doing anything at all:

	   aggregate {
			task_id => "%{sourceIP}%{protocol}%{NattedSourceIP}%{destinationIP}%{destinationPort}"
			code => "map['sourcePortCount'] ||= 0; map['sourcePortCount'] += 1;"
			push_map_as_event_on_timeout => true
			timeout_tags => ['_aggregatetimeout']
			timeout_code => "event.set('several_events', event.get('sourcePortCount') > 1)"


aggregate {
    task_id => "%{sourceIP}|%{protocol}|%{NattedSourceIP}|%{destinationIP}|%{destinationPort}"
    code => "
        map['sourcePortCount'] ||= 0; map['sourcePortCount'] += 1;
        map['protocol'] = event.get('protocol')
        map['sourceIP'] = event.get('sourceIP')
        map['NattedSourceIP'] = event.get('NattedSourceIP')
        map['destinationIP'] = event.get('destinationIP')
        map['destinationPort'] = event.get('destinationPort')
        map['@timestamp'] = event.get('@timestamp')
    push_map_as_event_on_timeout => true
    timeout => 2 # You might want to increase this from 2 seconds
    timeout_code => "
        event.set('[@metadata][wanted]', true)
if ![@metadata][wanted] { drop {} } # Drop the raw events, just keep the aggregates
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.