Hi,
I am having issues with the aggregate filter. I have version 2.4 of this filter installed. I am running logstash 2.4 as a service on Ubuntu 16.04.
In filter's GitHub readme file I found a nice feature that should be helpful in my use case - the ability to aggregate the data when there is no end event. In order to do that, I did exactly what the example says ( only changing the code part for my use case), but it doesn't seem to work. No aggregated events are being indexed into my elasticsearch.
my filter config is the following:
aggregate {
task_id => "%{task_id}"
code => "
map['event_counter'] ||= 0 ; map['event_counter'] += 1;
map['duration'] ||= 0; map['duration'] += event['duration'];
map['L7_goodput'] ||= 0; map['L7_goodput'] += event['L7_goodput'];
map['L7_throughput'] ||= 0; map['L7_throughput'] += event['L7_throughput'];
map['QoE_KBs'] ||= 0; map['QoE_KBs'] += event['QoE_KBs'];
map['QoE_Kbps'] ||= 0; map['QoE_Kbps'] += event['QoE_Kbps'];
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "task_id"
timeout => 60
timeout_tags => ['_aggregatetimeout']
timeout_code => "
event['QoE_KBs'] = event['QoE_KBs']/event['event_counter'];
event['QoE_Kbps'] = event['QoE_Kbps']/event['event_counter'];
event['L7_goodput_GB'] = event['L7_goodput']/((1024.0)**3);
event['L7_throughput_GB'] = event['L7_throughput']/((1024.0)**3);
"
}
(task_id is a field that I add to the event in a previous filter)
Lines are being written to log files monitored by filebeat and sent to logstash. All the lines are indexed properly into elasticsearch, however, there is not a single event in elastic with the tag _aggregatetimeout
so something isn't working right. To me, it would appear that no events are being pushed after a 60 second timeout. Maybe I'm missing something...?
I have set workers to 1 and the logs to verbose but I don't see anything in the logs.
I have no idea how to fix or debug this. Any help would be greatly appreciated.
Edit: went ahead a tried the example from github.
filter {
grok {
match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ]
}
aggregate {
task_id => "%{user_id}"
code => "map['clicks'] ||= 0; map['clicks'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
timeout => 600 # 10 minutes timeout
timeout_tags => ['_aggregatetimeout']
timeout_code => "event['several_clicks'] = (event['clicks'] > 1)"
}
}
with these logs:
INFO - 12345 - Clicked One
INFO - 12345 - Clicked Two
INFO - 12345 - Clicked Three
And it also doesn't work. So I am defiantly missing something...Help ?