Logstash Aggregate copy last Event to Map

Hello there,

I would like to combine several consecutive events. For this purpose I consider the use of the aggregation filter as useful. Since I don't have a specific start or end event, I will use example #3 in the Logstash documentation.

If a new event is created from the map at the time of the timeout, it will unfortunately only contain the previously assigned fields. How is it possible to use all fields of the last aggregated event? I think I need something like map['_source'] = event.get('[_source]') in my code section. But these Codeline doesn't work.

background:
In my log are ten consecutive entries with different parameters. I would like to combine these in one event. A unique task_id is present. The events occur in irregular intervals, so that the use of the push_previous_map_as_event does not seem to make sense to me.

Thanks for your tips
Best regards
siiman

1 Like

This might help.

Hi,

I have tried the following code from you.

aggregate {
    task_id => "%{task_id}"
    code => "                                                                                       
            event.to_hash.each { |k,v|
                   unless map[k]
                           map[k] = v 
                   end
            }
    "
    push_map_as_event_on_timeout => true
    timeout => 3
    timeout_tags => [ 'agg_timeout' ]
    add_tag => [ "_aggregate" ]
    timeout_task_id_field => "task_id"
}

this one works for my understanding exactly like this one.

code => "                                                                                       
    map.merge!(event)
"

Unfortunately both variants only copy the last event before the timeout into the map. Which I cannot understand logically.

Can you understand the behaviour and/or tell me a solution how to combine all events?

Alright the problem is solved.

Merge does not work with nested hashes (JSON structure). So here is my solution.

aggregate {
    task_id => "%{task_id}"
    code => "
            merger = proc { |key, v1, v2| Hash === v1 && Hash === v2 ? v1.merge(v2, &merger) : v2 }                                                                                           
            map.merge!(event, &merger)
     "
     push_map_as_event_on_timeout => true
     timeout => 3
     timeout_tags => [ 'agg_timeout' ]
     add_tag => [ "_aggregate" ]
}
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.