Using
json { source => "message" target => "[@metadata][data]" remove_field => [ "message" ] }
aggregate {
task_id => "%{[@metadata][data][ExchangeId]}"
timeout => 5
push_map_as_event_on_timeout => true
code => '
# we want 2 nested objects in the document object REQ_OUT and RESP_IN
type = event.get("[@metadata][data][Type]")
map[type] = {}
event.get("[@metadata][data]").each { |k, v|
unless [ "ExchangeId"].include? k
map[type][k] = v
end
}
event.to_hash.each { |k, v|
unless [ "@version" ].include? k
map[type][k] = v
end
}
event.cancel
'
timeout_task_id_field => "ExchangeId"
timeout_tags => ['_aggregatetimeout']
}
I get
{
"RESP_IN" => {
"RESP_IN" => "test2",
"severity" => "INFO",
"path" => "/home/user/foo.txt",
"Type" => "RESP_IN",
"ResponseCode" => "200",
"@timestamp" => 2021-04-14T14:57:51.115Z,
"Address" => "https://a-link.com",
"host" => "...",
"timestamp" => "2021-04-02T05:50:45.534Z"
},
"REQ_OUT" => {
"severity" => "INFO",
"path" => "/home/user/foo.txt",
"Type" => "REQ_OUT",
"REQ_OUT" => "test1",
"@timestamp" => 2021-04-14T14:57:51.115Z,
"Address" => "https://a-link.com",
"host" => "...",
"timestamp" => "2021-04-02T05:50:44.251Z"
},
"@timestamp" => 2021-04-14T14:58:01.101Z,
"@version" => "1",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
"tags" => [
[0] "_aggregatetimeout"
]
}
Which version of logstash are you running? I wonder if you are hitting the issue mentioned in this thread.