input { generator { count => 1 lines => [ '{ "task": 1, "in-mb": 10 }', '{ "task": 1, "cpu": 0.1}', '{ "task": 2, "in-mb": 11 }' ] } }
filter {
json { source => "message" remove_field => [ "message" ] }
aggregate {
task_id => "%{task}"
timeout_task_id_field => "task_id"
timeout_timestamp_field => "@timestamp"
code => '
event.to_hash.each { |k,v|
unless map[k]
map[k] = v
end
}
'
push_previous_map_as_event => true
}
}
output { stdout { codec => rubydebug { metadata => false } } }
will produce these two aggregated events
{
"task" => 1,
"cpu" => 0.1,
"task_id" => "1",
"sequence" => 0,
"in-mb" => 10,
"@version" => "1",
"host" => "dot.dot",
"@timestamp" => 2020-09-30T21:24:07.164Z
}
{
"task" => 2,
"task_id" => "2",
"sequence" => 0,
"in-mb" => 11,
"tags" => [
[0] "_aggregatefinalflush"
],
"@version" => "1",
"host" => "dot.dot",
"@timestamp" => 2020-09-30T21:24:07.251Z
}