Logsash pipeline terminating before timeout duration of aggregation filter plugin

Hi All,

I am having a logstash pipeline that has an elasticsearch plugin for input and aggregation filter plugin. The aggregation filter plugin has a timeout duration of 15 seconds. Refer to the filter plugin configuration

aggregate {
		task_id => "%{[test.region.keyword]}"
		code => "
				map['total_count_per_region'] ||= 0; 
				map['total_count_per_region'] += event.get('Count');
		push_map_as_event_on_timeout => true
		timeout_task_id_field => "test.region.keyword"
		timeout => 15 
		timeout_tags => ['_aggregatetimeout']
		timeout_code => "
						event.set('total_count_per_region', event.get('total_count_per_region'));

My pipeline configuration fetches documents from index as input and the filter plugin performs an aggregation using the aggregation plugin as shown above. This is going to generate aggregation result as an event after 15 seconds. But when I start my pipeline, it starts successfully without issues. However, before the aggregation events are generated(after 15 seconds) the pipeline is terminated and hence I don't get those events in my output.

 [2021-03-23T14:12:24,610][INFO ][logstash.javapipeline    ][pTest] Starting pipeline {:pipeline_id=>"pTest", "pipeline.workers"=>3, "pipeline.batch.size"=>1500, "pipeline.batch.delay"=>600, "pipeline.max_inflight"=>4500, "pipeline.sources"=>["/etc/logstash/conf.d/test1.conf"], :thread=>"#<Thread:0x7eabfdb5 run>"}
 [2021-03-23T14:12:24,689][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :pTest], :non_running_pipelines=>[  ]}
 [2021-03-23T14:12:28,601][INFO ][logstash.javapipeline    ][pTest] Pipeline terminated {"pipeline.id"=>"pTest"}

Please help in how to resolve this

You are probably hitting this.

I figured out the reason for the issue I was facing. I was using the elasticsearch input plugin without a schedule attribute and hence, by pipeline ran once when I started/restarted logstash and then terminated after the run. I had to schedule it to run once per day (which was my requirement as well) using the schedule attribute and that made sure that my pipeline was in running state.

This resolved the issue I have posted about. When the aggregation event was generated 15 seconds after my pipeline ran each day, my pipeline is still in running state and the events go through to the output successfully.

Thank you