I have a setup in which logstash forwarder is sending log events to logstash. Instead of checking filters on a single log event. I want to apply filters on a batch of events.
Here an batch of events is basically the collection of application logs with trace-ids.
For Example:
x.x.x.x 123 ..
x.x.x.x 123
x.x.x.x 456
Here 123 and 456 are my trace ids. I want to compare an batch of logs with same trace ids and if that batch satisfies the criterion then I need to send an entire batch to ElasticSearch otherwise drop the entire batch.
How can accomplish this in my logstash ?
That will combine all the messages for a given value of traceId into one array, which you can then test, and use a drop {} filter if you do not want to index it.
The usual caveat about aggregate applies -- you have to use '--pipeline.workers 1' so it does not scale.
@Badger. Thanks for the response. I am clearing things a little bit more.
After the timeout, the array formed will contain only events from a single trace-id?
How to test array as I don't want to send a modified event. I want to send exact events that came?
I don't want my event to be like this
{
"trace_id": "12345",
"msgs": [
{
event1
}
{
event2
}
]
}
I want to send event1 and event2 separately after testing.
The test that i need to do on the array. should i do in timeout_code field ?
And this aggregate function will create separate arrays for each id and for each id timeout period will start when it has encountered the first id ? @Badger
@Badger But the push_map_as_event_on_timeout when the timeout will expire will push the aggregate event. I don't want that. So how should i break the flow ?
Can u give a more relative code snippet like the one u have given above
If you want the events to look exactly like the original events you would have to save all of the fields of the original event in the maps, then reconstruct them. It's not a good solution.
I suggest you consider alternative approaches. For example, ingest everything into a staging index, delete the batches you do not want and then ingest the staging index into the final destination.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.