Logstash aggregate array of objects based on key field

I'm receiving some logs like these:

datetime="2020-09-2020T14:40:00-0300" action="tunnel-up" tunneltype="ssl-tunnel" tunnelid=1171992131 remip= user="someuser" group="vpn_ssl_group"


datetime="2020-09-2020T15:40:00-0300" action="tunnel-down" tunneltype="ssl-tunnel" tunnelid=1171992131 remip= user="someuser" group="vpn_ssl_group"

I'd like to be able to aggregate fields datetime and action grouped by the tunnelid field into an array of objects.

Final document should look like this:

    "datetime": "2020-09-2020T14:40:00-0300",
    "tunnelid": 1171992131,
    "user": "someuser",
    "group": "vpn_ssl_group",
    "remip": "",
    "logs": [{
        "datetime": "2020-09-2020T14:40:00-0300",
        "action": "tunnel-up"
        "datetime": "2020-09-2020T15:40:00-0300",
        "action": "tunnel-down"
    "duration": 3600

Is there any way I can do that? If so, can you please show me how?

Use an aggregate filter. Look at example 3 in the documentation.

Make sure you have a single pipeline worker thread. Note that when the filter pushes an event based on the map, the only fields on the event will be what you added to the map.

Must I have a single pipeline worker for every configuration file or can I specify the one which has the aggregation filter? If it's possible to split them, can I have a configuration file which keeps the input from the main conf and just adds the aggregation filter and put it in another pipeline?

The pipeline which contains the aggregate filter must only use a single worker thread. If you need to maintain event order then that applies to other pipelines too.

However, if you are OK with events getting re-ordered, which I expect you are for your use case then other pipelines could have multiple workers.

You can specify the number of workers for each pipeline in pipelines.yml

If the aggregate pipeline feeds the multi-worker pipeline then it is not going to scale well. If the multi-worker pipeline does a lot of work on each event (especially expensive calls like http, geoip, or dns filters) and then feeds the aggregate pipeline it may scale quite well.

1 Like

Thank you for the feedback, Badger, it's been very helpful! Last question, would you recommend pipeline to pipeline communication for this use case? By the way, I'm currently using tcp as the input and elasticsearch as the output plugins in the single configuration file I have set up.

Test it, measure it. If you cannot see a difference between the two options then choose the simpler one.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.