I've set up a pipeline to pipeline configuration following advice given in this topic Pipeline.workers configuration and aggregation filter
What I'm trying to solve is to be able to use the aggregate filter but not degrade performance by running everything through a single worker.
I have the input pipeline set up with 4 workers and it sends to 2 other pipelines, multi_worker_pipeline and single_worker_pipeline.
Pipeline to Pipeline guide
The multi worker pipeline is configured with 4 workers and the single worker is configured with 1 worker.
I'm trying to aggregate a message that is seen every minute and then produce an event when the line is not seen after 70 seconds using the inactivity_timeout feature of the aggregate filter.
The issue I see is that the lines are not being aggregated as expected which makes the inactivity timeout fire for between 1 and 3 lines of the message I want to aggregate.
if [channel_name] =~ /.*/ {
aggregate {
task_id => 'aggregated_%{channel_name}_%{application_name}'
code => '
map["lines"] ||= 0
map["lines"] += 1
map["times_seen"] ||= []
map["times_seen"] << Time.now()
'
timeout_task_id_field => 'lines_id'
timeout => 3600
inactivity_timeout => 70
timeout_code => '
event.set("timed_out_at", Time.now())
'
timeout_tags => ['_aggregatetimeout']
timeout_timestamp_field => '@timestamp'
push_map_as_event_on_timeout => true
}
}
Pipelines
- pipeline.id: intake
path.config: "/usr/share/logstash/pipeline/intake.conf"
pipeline.workers: 4
pipeline.ordered: false
queue.type: persisted
- pipeline.id: multi-worker
path.config: "/usr/share/logstash/pipeline/multi-worker.conf"
pipeline.workers: 4
pipeline.ordered: false
queue.type: persisted
- pipeline.id: single-worker
path.config: "/usr/share/logstash/pipeline/single-worker.conf"
pipeline.workers: 1
pipeline.ordered: true
queue.type: persisted
Intake output
output {
pipeline {
send_to => ["multi-worker", "single-worker"]
}
}
All lines are seen every minute.
All lines have the same 'application_name' and 'channel_name' used in task_id.
All lines are getting added to an aggregated map BUT not a single map as I would expect.
This makes the inactivity timeout fire multiple times when it should not fire at all unless the log line is not seen for more than 70 seconds.
Any ideas of what could be going wrong?