That requires logstash to look into the future and predict what events will occur after the current event. Tricky.
I think the best you can do is to use push_map_as_event_on_timeout.
aggregate {
task_id => "%{users}"
code => '
map["completion"] ||= 0
c = event.get("completion")
if c > map["completion"]
map["completion"] = c
end
event.cancel
'
push_map_as_event_on_timeout => true
timeout_task_id_field => "users"
timeout => 600 # 10 minutes timeout
}
Note that the event that is created will only contain the fields that you add to the map, so in this case it will have [completion] and [users] (because timeout_task_id_field is set). If you have other fields you want to preserve then add them to the map.
If the [users] field is only output when it changes then you could use a ruby filter to add it back in. You need order preserved, so make sure that pipeline.workers is 1 and pipeline.ordered evaluates to true.
ruby {
init => '@user = nil'
code => '
user = event.get("users")
if user
@user = user
else
event.set("users", @user)
end
'
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.