I asked for some aggregation code a while ago (Help with aggregation code) and now finally had the time to get back at this (and changed the output concept a bit). So I want to aggregate data from multiple documents with the same mail_id and create a new document to send to a specific index, while also keeping the original documents and send them to another index.
I'm using this filter now:
aggregate {
task_id => "%{mail_id}"
timeout_task_id_field => "mail_id"
push_map_as_event_on_timeout => true
code => '
map["@timestamp"] ||= event.get("@timestamp")
map["subject"] ||= event.get("subject")
map["route_nr"] ||= event.get("route_nr")
map["spam_punkte"] ||= event.get("spam_punkte")
'
timeout_timestamp_field => "@timestamp"
timeout => 5
timeout_tags => [ "aggregate_timeout" ]
}
First of all the results are very inconsistent, if I output to file1, file2, file3.. in a row they all have a different amount of total lines and aggregations. I run logstash with -w 1
which is also reflected at the beginning of a run with "pipeline.workers"=>1
so this should not be the issue. The input is a static json file so it doesn't change either.
Then, most of the aggregations look like this:
{
"spam_punkte" => 2.6,
"subject" => "redacted",
"route_nr" => "60962",
"tags" => [
[0] "aggregate_timeout",
[1] "_aggregatefinalflush"
],
"@version" => "1",
"mail_id" => "2276982498",
"@timestamp" => 2023-09-07T09:09:41.282Z
}
Generally this is everything I want for the the new document, but _aggregatefinalflush
results in everything being flushed in any further filtered output, e.g. if "aggregate_timeout" in [tags]
but also with if "_aggregatefinalflush" in [tags]
.
When I do get some tag-filtered output, it's only a few documents with missing data because the related documents did not arrive within the timeout range. From my understanding, push_map_as_event_on_timeout => true
should prevent exactly this?
I've been working on this for hours now and just can't figure out what I'm missing, please help.