Hi all,
I'm new to the ELK family and currently encountering a problem.
I'm using the logstash-filter-aggregate to aggregate events that have a clear START event (second field containing 70), but no END event.
The task_id can be the same throught the file.
The problem is that if I use the timeout variable to flush the map, it may be possible that a new START event arrives before the end of the timeout and so that event is lost.
Here's an example of the log file
task_id,var1,var2
id1,a70,aaa
id1,a30,bbb
id1,x70,ccc
id1,b40,ddd
and the config
if [var1] =~ "70" {
aggregate {
task_id => "%{task_id}"
code => "
map['var2'] = event.get('var2')
"
map_action => "create"
}
} else {
aggregate {
task_id => "%{task_id}"
code => "
map['var2'] << event.get('var2')
"
push_map_as_event_on_timeout => true
timeout => 60
}
}
So for the previous events I should have 2 events
Any help will be appreciated.
thanks!