Using Logstash aggregate filter with only start and continuation events

Hey there,

I am attempting to merge the "message" field for syslog logs using the aggregate pipeline, and have been beating my head against a wall trying to get this scenario to work. I'm on the verge of giving up and implementing this using redis and ruby filters. Maybe someone can suggest where I'm going off the rails.

I'm ingesting logs using rsyslogd and its omkafka module, sending log events to Kafka using message keys based on the source machine's hostname.

The Kafka topic has exactly the same number of partitions as I've allocated worker threads. I have thoroughly tested whether each Kafka input filter on my logstash machines is assigned to one and only one partition (and they are).

Not every log I'm processing comes from a process that can generate multiple lines of output. The ones that do are tracked in a Redis set (via a Ruby filter). I've added logic to my filters to make sure that the aggregate filters can't even run unless they're being applied to a program that does in fact generate multiple log lines.

In my case, it's easy to tell the difference between "start" events and what I am calling "continuation" events for a given process. The "message" field in start events doesn't begin with "-->" and the continuation events do.

Here's my problem.

  1. There is no "end log" event. The same process from the same host logging to the same Kafka partition just sends a new "start event".
  2. I can't find a way to explicitly and asynchronously flush the aggregate filter's log to Elasticsearch when a new start event is detected. It seems that the log is just being overwritten by the next event without ever completing the pipeline and being sent on to the Elasticsearch output filter.
  3. I'm not concerned about timeouts since all of these log messages are sent within milliseconds for any given multiline log. I am, however, concerned about the existing aggregate log being lost when a new start event is detected, which seems to be what's happening.

Frankly I find the examples in the documentation not especially helpful for this kind of log. It may be that I'm trying to do something this filter just can't do, in which case I will build an asynchronous method of trapping and aggregating these events in Redis and running another process periodically to flush the completed ones. But if someone in the know could tell me whether what I'm attempting with the aggregate filter is even achievable, I would very much appreciate it.

OK, I spent some time looking at this. At one point my "solution" had three aggregate filters, two ruby filters, a global variable (not local, not instance, not class, but global), as well as calls to event.clone and new_event_block.call to create new events. :rofl:

I don't think logstash can do what you want.

For a single file like this

First event
-> and continuation thereof
Second event
-> and it has two
-> continuation lines
Third event
Fourth event
-> and another continuation
-> and yet more

then provided that you use a single pipeline.worker process and set pipeline.ordered to true ... you can use this code

    ruby {
        init => '@taskId = 0'
        code => '
            # If it is a new message, start a new task and let the old one time out
            if event.get("message") !~ /^->/ ; @taskId += 1; end

            event.set("[@metadata][taskId]", @taskId)
        '
    }

    aggregate {
        task_id => "%{[@metadata][taskId]}"
        code => '
            map["message"] ||= ""
            map["message"] += event.get("message") + ";;"
            event.cancel
        '
        timeout => 5
        push_map_as_event_on_timeout => true
    }

which, when the timeout fires, will produce four events:

{
"@timestamp" => 2024-10-16T18:33:35.402886759Z,
  "@version" => "1",
   "message" => "First event;;-> and continuation thereof;;"
}
{
"@timestamp" => 2024-10-16T18:33:35.403333665Z,
  "@version" => "1",
   "message" => "Second event;;-> and it has two;;-> continuation lines;;"
}
{
"@timestamp" => 2024-10-16T18:33:35.403382449Z,
  "@version" => "1",
   "message" => "Third event;;"
}
{
"@timestamp" => 2024-10-16T18:33:35.403404598Z,
  "@version" => "1",
   "message" => "Fourth event;;-> and another continuation;;-> and yet more;;"
}

Note that when you use push_map_as_event_on_timeout it just pushes the contents of the map, so if you need anything else from the events containing the log messages then you need to add them to the map too.

As I said, for a single input this will work, but if you have multiple inputs it may not. Each input will fill a pipeline.batch with events and then flush it. If an input splits the events of a single syslog message across two batches then another input may flush a batch between them. At that point your events are not ordered properly.