Aggregate filter - push on event size limit

Hi there,

I would like to aggregate events until the aggregated event reaches a certain size limit (e.g. 256kb) at which point I would like to push the event to the queue. There are no start/end events.
Push on timeout does not solve my problem as hundreds of events may unpredictably arrive in a very short time interval. Is there a way to do that?

Thanks,
bosand

1 Like

At first I thought not, but there is a distinctly non-scaleable way to do it. In addition to the usual requirement of "--pipeline.workers 1" you need "--pipeline.batch.size 1" so that every event goes through the second aggregate filter before the first aggregate filter processes another event.

This is just a proof-of-concept that demonstrates how it could be done.

filter { json { source => "message" } }

input { stdin {} }
input { generator { count => 10 lines => [ '{ "id": "a", "data": "123456789012345678901234567890123456789012345678901234567890"}' ] } }
filter {
    aggregate {
        task_id => "%{id}"
        code => '
            map["task"] ||= ""
            map["task"] += event.get("data")
            if map["task"].bytesize > 200
                event.set("[@metadata][timeToFlush]", true)
            end
        '
        push_map_as_event_on_timeout => true
        timeout => 10
        timeout_task_id_field => "id"
        timeout_code => '
            event.set("[@metadata][timeToFlush]", true)
        '
    }
    if [@metadata][timeToFlush] {
        aggregate {
            task_id => "%{id}"
            code => '
                event.set("task", map["task"])
                map["task"] = ""
            '
            map_action => "update"
            end_of_task => true
        }
    } else {
        drop {}
    }
}

The stdin generator is just there to prevent logstash shutting down the pipeline when the generator input finishes. If you remove that when using a generator input you would not get the timeout. For almost any other input it is not needed or useful.

1 Like

This is a nice trick.
Thanks Badger!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.