My pipeline consists of three JDBC inputs, a set of common filters and one output stage to Elasticsearch. The three JDBC inputs fetch similar data from three different tables, the aggregate filter groups the data by a particular field that is present in all three tables, while the output filter dynamically sets the type based on which table the data originated from.
input {
jdbc {
statement => "Select foo FROM table_a ORDER BY foo;"
}
jdbc {
statement => "Select foo FROM table_b ORDER BY foo;"
}
jdbc {
statement => "Select foo FROM table_c ORDER BY foo;"
}
}
filters {
aggregate {
task_id => "%{foo}"
code => "...."
push_previous_map_as_event => true
}
}
output {
elasticsearch {
action => "index"
document_id => "%{foo}"
document_type => "%{[@metadata][type]}"
}
}
My problem is that the 3 queries are not executed sequentially, but in parallel, or at least in quick succession. This has the effect that the results / events arrive interleaved, e.g. a few events from table_a followed by events from table_b, then back to table_a, etc. This interferes with the workings of the aggregate filter which expects all events from table_a ordered by foo, then all events from table_b ordered by foo, etc.
I've been setting the worker count to 1 when running the pipeline, but upon closer reading of the documentation I realized that this setting only affects the filter and output stages of the pipeline.
I read that "Each input stage in the Logstash pipeline runs in its own thread." In my particular case, do I have other options besides running each input separately?
A similar question was asked before but didn't receive an answer.