At first I thought not, but there is a distinctly non-scaleable way to do it. In addition to the usual requirement of "--pipeline.workers 1" you need "--pipeline.batch.size 1" so that every event goes through the second aggregate filter before the first aggregate filter processes another event.
This is just a proof-of-concept that demonstrates how it could be done.
filter { json { source => "message" } }
input { stdin {} }
input { generator { count => 10 lines => [ '{ "id": "a", "data": "123456789012345678901234567890123456789012345678901234567890"}' ] } }
filter {
aggregate {
task_id => "%{id}"
code => '
map["task"] ||= ""
map["task"] += event.get("data")
if map["task"].bytesize > 200
event.set("[@metadata][timeToFlush]", true)
end
'
push_map_as_event_on_timeout => true
timeout => 10
timeout_task_id_field => "id"
timeout_code => '
event.set("[@metadata][timeToFlush]", true)
'
}
if [@metadata][timeToFlush] {
aggregate {
task_id => "%{id}"
code => '
event.set("task", map["task"])
map["task"] = ""
'
map_action => "update"
end_of_task => true
}
} else {
drop {}
}
}
The stdin generator is just there to prevent logstash shutting down the pipeline when the generator input finishes. If you remove that when using a generator input you would not get the timeout. For almost any other input it is not needed or useful.