Routing events to specific workers

In case Logstash runs with multiple pipeline workers, events are handed out to each worker in turn as they get available. Certain plugins (e.g. such as aggregate but also a few others doing some in-memory caching) require the user to set a single worker (-w 1) in order for the events not to be processed out of sequence. It's sad to have 2+ CPUs and only be able to use a single one just because of this limitation.

I'm wondering if we could potentially avoid this by introducing a way to route events to a specific worker, very much like how the routing parameter works in ES in regard to routing documents to specific shards or the partitioning key in Kafka. The idea would be to introduce a base-level routing parameter that we could add to any inputs and whose (hashed) value would be taken into account in order for Logstash to know to which worker the event should be routed to (worker_id = hash(routing) % pipeline.workers). This value could be static or could come from some field value of the event. In this latter case, this would also imply that the field needs to be available before the filter phase, which might not be the case for inputs that produce plain events by default, but could be circumvented by using the json codec instead.

Let's consider a file containing JSON logs such as

{"timestamp": "2017-01-19T10:32:14.123Z", "deviceId": 123, "message": "Device has been powered on"}
{"timestamp": "2017-01-19T10:32:17.234Z", "deviceId": 123, "message": "Button 1 pressed"}
{"timestamp": "2017-01-19T10:34:19.432Z", "deviceId": 123, "message": "Button 2 pressed"}
{"timestamp": "2017-01-19T10:36:02.583Z", "deviceId": 456, "message": "Device has been powered on"}
{"timestamp": "2017-01-19T10:37:12.927Z", "deviceId": 456, "message": "Button 1 pressed"}
{"timestamp": "2017-01-19T10:38:33.583Z", "deviceId": 456, "message": "Button 2 pressed"}

A file input consuming those logs could be made "worker-aware" by routing the events based on the deviceId field like this

input {
  file {
    path => "/path/to/file.json"
    codec => "json"
    routing => "%{deviceId}"
  }
}

From this point on, we can ensure that all events of device 123 will always be handled by the same worker thread and the same goes for device 456.

This change could allow users

  1. to not have to worry about whether they have to use a single worker thread or not
  2. to allow users to leverage the full power of their server (i.e. all CPUs) if they wish to

I saw that it will soon be possible to define several pipelines inside a single Logstash instance, but that won't solve the issue I'm pointing at here.

Does anyone have any thoughts on this? Is there a similar effort going on internally?

I'd suggest you also raise this as a feature/enhancement on the main Logstash repo - github.com/elastic/logstash/issues :slight_smile:

Thanks for your input Mark.
I've filed it here: https://github.com/elastic/logstash/issues/6572
We'll see what unfolds :wink:

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.