Hi,
I've been wrestling with my Aggregate plugin configuration and it seems like it interacts poorly with the Logstash micro-batching framework in my case.
My logstash config file (filter section) looks roughly like this:
grok {
match => { "message" => "^%{TIMESTAMP_ISO8601:local_timestamp}\s+%{GREEDYDATA:message}" }
overwrite => "message"
}
grok {
match => { "message" => "^Start task %{GREEDYDATA:task_id}" }
}
if "_grokparsefailure" not in [tags] {
aggregate {
task_id => "%{source}" # Note the use of a global task_id
code => "map[:task_id] = event.get('task_id')"
}
}
if [message] == "Finish task" {
aggregate {
task_id => "%{source}"
code => "event.set('task_id', map[:task_id])"
}
}
Now my problem from is that this configuration file seems to work is:
First grok filter (extract message) is applied to a batch of 125 items
Second grok filter (find 'Start task') is applied to the batch
Thirdly, the first aggregate filter is applied to same batch
Finally, the second aggregate filter is applied, again to all of the batch
This batching of events per-filter gives the effect of items being seen out-of-order, similar to what happens if there are multiple pipeline workers (I only have one). In the example, both 'Finish' events receive the task_id of 'Task2'.
I have found two different solutions/workarounds to my issue:
Set pipeline.batch.size to 1 in logstash.yml to force events to get treated one at a time
Wrap the entire filter section in a giant 'if [message] =~ /^/', which seems to force the logstash internals to treat the entire block as one serialized filter
This gives rise to my questions:
Am I missing something obvious?
Are there any other ways around the issue?
What is the likely performance impact of changing the batch size?
Is the workaround of a global 'if' statement likely to stop working at some point?
Should the aggregate plugin documentation mention this limitation, similar to the way that there is a prominent warning about only having 1 worker?
For the record, setting pipeline.batch.size gave terrible performance, so I have stuck with my workaround of wrapping the entire set of filters in a large
if [message] != "" {
...
}
to force the filters to be applied serially to each incoming message.
I already only have 1 pipeline worker, so I'm not running into the documented parallelisation problem with multiple workers. My issue is that filters are being applied in turn to each batch of events.
For a Batch (Bn) of Events (En0..m) and a set of Filters F0, F1, F2 I'm observing
F0(B0), F1(B0), F2(B0), F0(B1), F1(B1), F2(B1), F0(B3), ...
whereas I need to have
F0(E00), F1(E00), F2(E00), F0(E01), F1(E02), F2(E03), F0(E04), ...
Since I'm using stateful filters (via the aggregate filter), the correct treatment of F0(E02) depends on F2(E00) being processed first, but it looks like by default individual filters are applied - in order - to batches of events. This means that filter F0 is applied to the first batch, then filter F1 is applied to the first batch, then filter F2 is applied to the first batch, then filter F0 is applied to the second batch, etc. I need to find a way to force strict 'in-order' processing of events by the filters in the configuration file, and wrapping the entire filter block in a big 'if' statement seems to work around this correctly for the moment.
This type of processing tends to be slow and not scale well as all related events need to be processed in a single thread.
A more scalable option might be to instead have an external process/script that periodically performs searches against indexed data and updates events that are not yet complete. This would allow Logstash to work without restraints and is likely to scale much better.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.