Aggregate - Output issues

I asked for some aggregation code a while ago (Help with aggregation code) and now finally had the time to get back at this (and changed the output concept a bit). So I want to aggregate data from multiple documents with the same mail_id and create a new document to send to a specific index, while also keeping the original documents and send them to another index.

I'm using this filter now:

      aggregate {
        task_id => "%{mail_id}"
        timeout_task_id_field => "mail_id"
        push_map_as_event_on_timeout => true
        code => '
            map["@timestamp"] ||= event.get("@timestamp")
            map["subject"] ||= event.get("subject")
            map["route_nr"] ||= event.get("route_nr")
            map["spam_punkte"] ||= event.get("spam_punkte")
        '
        timeout_timestamp_field => "@timestamp"
        timeout => 5
        timeout_tags => [ "aggregate_timeout" ]
        }

First of all the results are very inconsistent, if I output to file1, file2, file3.. in a row they all have a different amount of total lines and aggregations. I run logstash with -w 1 which is also reflected at the beginning of a run with "pipeline.workers"=>1 so this should not be the issue. The input is a static json file so it doesn't change either.

Then, most of the aggregations look like this:

{
    "spam_punkte" => 2.6,
        "subject" => "redacted",
       "route_nr" => "60962",
           "tags" => [
        [0] "aggregate_timeout",
        [1] "_aggregatefinalflush"
    ],
       "@version" => "1",
        "mail_id" => "2276982498",
     "@timestamp" => 2023-09-07T09:09:41.282Z
}

Generally this is everything I want for the the new document, but _aggregatefinalflush results in everything being flushed in any further filtered output, e.g. if "aggregate_timeout" in [tags] but also with if "_aggregatefinalflush" in [tags].

When I do get some tag-filtered output, it's only a few documents with missing data because the related documents did not arrive within the timeout range. From my understanding, push_map_as_event_on_timeout => true should prevent exactly this?

I've been working on this for hours now and just can't figure out what I'm missing, please help.

I've figured out that push_map_as_event_on_timeout is definitely causing the issues, because it works with push_previous_map_as_event. To test this I created another config with elasticsearch as input, querying already indexed documents sorted by mail_id so they are correctly ordered for push_previous_map_as_event, and then I get the aggregated documents without every single one being being tagged with _aggregatefinalflush and properly writing to an index.
For the moment this is better than nothing, but given the non-continuous operation of the elasticsearch input and the data loss between the scheduled runs, I'd really want to make this work in the regular pipeline.

It kinda looks like it has something to do with the task_id because debugging with push_previous_map_as_event shows lines like this:
Aggregate create_timeout_event call with task_id '2276983772'

While with push_map_as_event_on_timeout , there's no IDs:
Aggregate remove_expired_map_based_on_event_timestamp call with task_id : '%{mail_id}'

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.