Processing and comparing events in batches

I have a setup in which logstash forwarder is sending log events to logstash. Instead of checking filters on a single log event. I want to apply filters on a batch of events.
Here an batch of events is basically the collection of application logs with trace-ids.
For Example:

  1. x.x.x.x 123 ..
  2. x.x.x.x 123
  3. x.x.x.x 456
    Here 123 and 456 are my trace ids. I want to compare an batch of logs with same trace ids and if that batch satisfies the criterion then I need to send an entire batch to ElasticSearch otherwise drop the entire batch.
    How can accomplish this in my logstash ?

You might be able to do it using an aggregate filter

    aggregate {
        task_id => "%{traceId}"
        code => '
            map["msgs"] ||= [] # Or "", whatever floats your boat.
            map["msgs"] << event.get("message")
            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "traceId"
        timeout => 5 # Or longer
    }

That will combine all the messages for a given value of traceId into one array, which you can then test, and use a drop {} filter if you do not want to index it.

The usual caveat about aggregate applies -- you have to use '--pipeline.workers 1' so it does not scale.

@Badger. Thanks for the response. I am clearing things a little bit more.
After the timeout, the array formed will contain only events from a single trace-id?
How to test array as I don't want to send a modified event. I want to send exact events that came?
I don't want my event to be like this
{
"trace_id": "12345",
"msgs": [
{
event1
}
{
event2
}
]
}
I want to send event1 and event2 separately after testing.

If you decide to retain a set of events you could use a split filter to convert that array into multiple events.

The test that i need to do on the array. should i do in timeout_code field ?
And this aggregate function will create separate arrays for each id and for each id timeout period will start when it has encountered the first id ? @Badger

You could do it in the timeout_code, you could do it in a subsequent logstash conditional or a subsequent ruby filter.

The timeout of an aggregate is relative to the first event seen for a given id.

@Badger But the push_map_as_event_on_timeout when the timeout will expire will push the aggregate event. I don't want that. So how should i break the flow ?
Can u give a more relative code snippet like the one u have given above

If you want the events to look exactly like the original events you would have to save all of the fields of the original event in the maps, then reconstruct them. It's not a good solution.

I suggest you consider alternative approaches. For example, ingest everything into a staging index, delete the batches you do not want and then ingest the staging index into the final destination.

staging index as in?
Is it some kind of temporary buffer?

Another index in elasticsearch.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.