Pipeline to Pipeline forked configuration to enable use of the aggregation filter

I've set up a pipeline to pipeline configuration following advice given in this topic Pipeline.workers configuration and aggregation filter

What I'm trying to solve is to be able to use the aggregate filter but not degrade performance by running everything through a single worker.

I have the input pipeline set up with 4 workers and it sends to 2 other pipelines, multi_worker_pipeline and single_worker_pipeline.
Pipeline to Pipeline guide

The multi worker pipeline is configured with 4 workers and the single worker is configured with 1 worker.

I'm trying to aggregate a message that is seen every minute and then produce an event when the line is not seen after 70 seconds using the inactivity_timeout feature of the aggregate filter.

The issue I see is that the lines are not being aggregated as expected which makes the inactivity timeout fire for between 1 and 3 lines of the message I want to aggregate.

if [channel_name] =~ /.*/ {

	aggregate {
		task_id => 'aggregated_%{channel_name}_%{application_name}'
		code => '
			map["lines"] ||= 0
			map["lines"] += 1
			map["times_seen"] ||= []
			map["times_seen"] << Time.now()
		'
		timeout_task_id_field => 'lines_id'
		timeout => 3600
		inactivity_timeout => 70
		timeout_code => '
			event.set("timed_out_at", Time.now())
		'
		timeout_tags => ['_aggregatetimeout']
		timeout_timestamp_field => '@timestamp'
		push_map_as_event_on_timeout => true
	}

}

Pipelines

- pipeline.id: intake
  path.config: "/usr/share/logstash/pipeline/intake.conf"
  pipeline.workers: 4
  pipeline.ordered: false
  queue.type: persisted
- pipeline.id: multi-worker
  path.config: "/usr/share/logstash/pipeline/multi-worker.conf"
  pipeline.workers: 4
  pipeline.ordered: false
  queue.type: persisted
- pipeline.id: single-worker
  path.config: "/usr/share/logstash/pipeline/single-worker.conf"
  pipeline.workers: 1
  pipeline.ordered: true
  queue.type: persisted

Intake output

output { 
    pipeline { 
        send_to => ["multi-worker", "single-worker"] 
    }
}

All lines are seen every minute.
All lines have the same 'application_name' and 'channel_name' used in task_id.
All lines are getting added to an aggregated map BUT not a single map as I would expect.
This makes the inactivity timeout fire multiple times when it should not fire at all unless the log line is not seen for more than 70 seconds.

Any ideas of what could be going wrong?

This is also happens if you set all pipelines to use a single worker?

The aggregate filter requires a single worker, but if the data is coming from other pipelines that are not using a single worker, this may also impact in how it will work.

This will result in unordered events being sent to the "single_worker" pipeline. The "intake" pipeline has to maintain order, which means it has to have a single worker. And yes, that will impact the overall throughput.

Thank you for the clarification.

Thank you.