Aggregate plugin - problem with micro-batching

Hi,
I've been wrestling with my Aggregate plugin configuration and it seems like it interacts poorly with the Logstash micro-batching framework in my case.

I have log lines similar to this (simplified):

2018-03-21 10:05:24,305 Start task Task1
2018-03-21 10:05:24,306 Finish task
2018-03-21 10:05:24,307 Start task Task2
2018-03-21 10:05:24,308 Finish task

My logstash config file (filter section) looks roughly like this:

grok {
  match => { "message" => "^%{TIMESTAMP_ISO8601:local_timestamp}\s+%{GREEDYDATA:message}" }
  overwrite => "message"
}
grok {
  match => { "message" => "^Start task %{GREEDYDATA:task_id}" }
}
if "_grokparsefailure" not in [tags] {
  aggregate {
    task_id => "%{source}" # Note the use of a global task_id
    code => "map[:task_id] = event.get('task_id')"
  }
}
if [message] == "Finish task" {
  aggregate {
    task_id => "%{source}"
    code => "event.set('task_id', map[:task_id])"
  }
}

Now my problem from is that this configuration file seems to work is:

  1. First grok filter (extract message) is applied to a batch of 125 items
  2. Second grok filter (find 'Start task') is applied to the batch
  3. Thirdly, the first aggregate filter is applied to same batch
  4. Finally, the second aggregate filter is applied, again to all of the batch

This batching of events per-filter gives the effect of items being seen out-of-order, similar to what happens if there are multiple pipeline workers (I only have one). In the example, both 'Finish' events receive the task_id of 'Task2'.

I have found two different solutions/workarounds to my issue:

  1. Set pipeline.batch.size to 1 in logstash.yml to force events to get treated one at a time
  2. Wrap the entire filter section in a giant 'if [message] =~ /^/', which seems to force the logstash internals to treat the entire block as one serialized filter

This gives rise to my questions:

  • Am I missing something obvious?
  • Are there any other ways around the issue?
  • What is the likely performance impact of changing the batch size?
  • Is the workaround of a global 'if' statement likely to stop working at some point?
  • Should the aggregate plugin documentation mention this limitation, similar to the way that there is a prominent warning about only having 1 worker?

Thanks

For the record, setting pipeline.batch.size gave terrible performance, so I have stuck with my workaround of wrapping the entire set of filters in a large

if [message] != "" {
  ...
}

to force the filters to be applied serially to each incoming message.

I suspect you should be able to use the default batch size as long as you set pipeline.workers to 1 for that pipeline.

Hi Christian,

Thanks for looking at my problem.

I already only have 1 pipeline worker, so I'm not running into the documented parallelisation problem with multiple workers. My issue is that filters are being applied in turn to each batch of events.

For a Batch (Bn) of Events (En0..m) and a set of Filters F0, F1, F2 I'm observing
F0(B0), F1(B0), F2(B0), F0(B1), F1(B1), F2(B1), F0(B3), ...
whereas I need to have
F0(E00), F1(E00), F2(E00), F0(E01), F1(E02), F2(E03), F0(E04), ...

Since I'm using stateful filters (via the aggregate filter), the correct treatment of F0(E02) depends on F2(E00) being processed first, but it looks like by default individual filters are applied - in order - to batches of events. This means that filter F0 is applied to the first batch, then filter F1 is applied to the first batch, then filter F2 is applied to the first batch, then filter F0 is applied to the second batch, etc. I need to find a way to force strict 'in-order' processing of events by the filters in the configuration file, and wrapping the entire filter block in a big 'if' statement seems to work around this correctly for the moment.

This type of processing tends to be slow and not scale well as all related events need to be processed in a single thread.

A more scalable option might be to instead have an external process/script that periodically performs searches against indexed data and updates events that are not yet complete. This would allow Logstash to work without restraints and is likely to scale much better.

I plan on addressing any scaling issues by having multiple Logstash instances.

How? Have you changed the approach tho how you handle aggregation?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.