Aggregate filter - push on event size limit

At first I thought not, but there is a distinctly non-scaleable way to do it. In addition to the usual requirement of "--pipeline.workers 1" you need "--pipeline.batch.size 1" so that every event goes through the second aggregate filter before the first aggregate filter processes another event.

This is just a proof-of-concept that demonstrates how it could be done.

filter { json { source => "message" } }

input { stdin {} }
input { generator { count => 10 lines => [ '{ "id": "a", "data": "123456789012345678901234567890123456789012345678901234567890"}' ] } }
filter {
    aggregate {
        task_id => "%{id}"
        code => '
            map["task"] ||= ""
            map["task"] += event.get("data")
            if map["task"].bytesize > 200
                event.set("[@metadata][timeToFlush]", true)
            end
        '
        push_map_as_event_on_timeout => true
        timeout => 10
        timeout_task_id_field => "id"
        timeout_code => '
            event.set("[@metadata][timeToFlush]", true)
        '
    }
    if [@metadata][timeToFlush] {
        aggregate {
            task_id => "%{id}"
            code => '
                event.set("task", map["task"])
                map["task"] = ""
            '
            map_action => "update"
            end_of_task => true
        }
    } else {
        drop {}
    }
}

The stdin generator is just there to prevent logstash shutting down the pipeline when the generator input finishes. If you remove that when using a generator input you would not get the timeout. For almost any other input it is not needed or useful.

1 Like