Is it possible to aggregate logs from cef input

Hi,

Is it possible to apply aggregate filter on cef logs. Would you please check my conf file if there is something wrong. it should aggregating when name and sourceAddress are common and increase field aggregatedEvents by 1 every time. below are the conf file:

input
{
udp
{
port => "516"
type => "Forcepoint"
codec => "cef"
}
}

filter
{
aggregate {
task_id => "%{name}_%{sourceAddress}"
code => "
map['aggregatedEvents'] ||= 0;
map['aggregatedEvents'] += 1;
"
timeout => 30
}

}

output
{
elasticsearch
{
hosts => ["10.xx.xx.xx:9200"]
index => "logstash-fpnofilter-%{+MM.dd.YYYY}"
}
}

It's possible, yes. But if I am reading it correctly you add aggregatedEvents to the map, and then expire it 30 seconds later. You don't push the map as an event on timeout, and you have no end-task.

What do you want to do with this aggregatedEvents counter and when do you want to do it?

I made as simple as is

aggregatedEvents is to see how many events were creating this aggregated log

for example if its 3 thats mena the 3 logs inlcuded to make the output aggregated log

You didn't really answer either of my questions, but does this help you?

    aggregate {
        task_id => "%{name}_%{sourceAddress}"
        code => "
            event.cancel()
            map['aggregatedEvents'] ||= 0;
            map['aggregatedEvents'] += 1;
        "
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "task_id"
        timeout => 5
    }
1 Like

Thank you so much Badger

I copied the aggregate section and its working and aggregating. However, currently I am geeting fields only task_id and aggregatedEvents. How I get the fields name and source address separated.
image
and again thank you for your full support

Add them as additional entries in the map.

I did nd successfully seeing the sourceAddress field in Kibana. Please, How can I delete task_id field. I dont need to save it in index?

image

Remove

 timeout_task_id_field => "task_id"

I tried to use it but it didnt work

I said to remove that. That is what causes it to add task_id to the event.

it works, thank you

Another question:
I need to aggrgate bytesIn and bytesOut fields which are integer fields. I mean if there is three logs with byteIn (500,400,600) the final byteIn aggrgated field will be 1500. How I will do this in code section of aggrgate filter

Very similar to counting events

map['totalBytesIn'] ||= 0
map['totalBytesIn] += event.get('bytesIn')

Thank you,

I am seeing that logs have field tags = _aggregatefinalflush
image

What us that mean?

It tells you those events were emitted due to the pipeline flush when logstash shut down. See this issue for a discussion. My advice is to ignore it.

Now the code is doing aggregation for logs with the same task_id for timeout of 30 secs.

what is I want only max 50 logs aggreagted within 30 secs. how I will do this?

If you want to ignore everything after the first 50 then wrap some of the += lines with a test for map['aggregatedEvents'] being <= 50. If you want to push an event for the first 50 and start a new aggregation I am not sure that you can do that.

I am doing now geoip for the destination IP as below. How can I map field geoip.location to use it for map visualize. Would you advise me and thank you again for your full support
image

I found it.

map['destinationGeoIP'] = event.get('geoip')

Thank you

Hi Badger,

I am seeing the Emitted events is more than the Received Events.
I think aggregation goal is to decrease the Emitted events in comparsion to the Recevived Events.

I checked the aggregation its working fine. Need your advise.


Thank you

If you have monitoring enabled and drill into a pipeline you will see that inputs emit events, and outputs receive them. To me this is counter-intuitive, I think a pipeline should receive events, process them, and then emit them, but that's not how those terms are used in logstash.