I have a long config that looks at several types of messages - very similar to the canonical examples given at the aggregate plugin docs page, Example 1, here's the gist:
filter {
grok {
match => { ... }
add_field => { 'type' => 'type_a' }
}
if (! [type]) {
grok {
match => { ... }
add_field => { 'type' => 'type_b' }
}
}
if (! [type]) {
grok {
match => { ... }
add_field => { 'type' => 'type_c' }
}
}
if ([type] == 'type_b') {
aggregate {
task_id => '...'
map_action => 'create'
code => "..."
}
drop {}
}
if ([type] == 'type_c') {
aggregate {
task_id => '...'
end_of_task => true
timeout => 60
map_action => 'update'
code => "..."
}
}
}
The config seems to be working OK otherwise, i.e. it picks up the correct types and other fields as needed from what I am seeing.
I've confirmed I have one worker:
Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>1000, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x754f9f28 run>"}
However, I still see it processing out of order. E.g. for a file such as:
type A
type B
type C
where type B
and type C
are aggregated as start / end events per the above config, I see it processing in this order:
type C
type A
which completely breaks the things downstream.
What should I be doing differently or what else can I try to troubleshoot or solve this?