Hi All,
I am having a logstash pipeline that has an elasticsearch plugin for input and aggregation filter plugin. The aggregation filter plugin has a timeout duration of 15 seconds. Refer to the filter plugin configuration
aggregate {
task_id => "%{[test.region.keyword]}"
code => "
map['total_count_per_region'] ||= 0;
map['total_count_per_region'] += event.get('Count');
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "test.region.keyword"
timeout => 15
timeout_tags => ['_aggregatetimeout']
timeout_code => "
event.set('total_count_per_region', event.get('total_count_per_region'));
"
}
My pipeline configuration fetches documents from index as input and the filter plugin performs an aggregation using the aggregation plugin as shown above. This is going to generate aggregation result as an event after 15 seconds. But when I start my pipeline, it starts successfully without issues. However, before the aggregation events are generated(after 15 seconds) the pipeline is terminated and hence I don't get those events in my output.
[2021-03-23T14:12:24,610][INFO ][logstash.javapipeline ][pTest] Starting pipeline {:pipeline_id=>"pTest", "pipeline.workers"=>3, "pipeline.batch.size"=>1500, "pipeline.batch.delay"=>600, "pipeline.max_inflight"=>4500, "pipeline.sources"=>["/etc/logstash/conf.d/test1.conf"], :thread=>"#<Thread:0x7eabfdb5 run>"}
[2021-03-23T14:12:24,689][INFO ][logstash.agent ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :pTest], :non_running_pipelines=>[ ]}
[2021-03-23T14:12:28,601][INFO ][logstash.javapipeline ][pTest] Pipeline terminated {"pipeline.id"=>"pTest"}
Please help in how to resolve this