Merge message with aggregate filter

Using

    json { source => "message" target => "[@metadata][data]" remove_field => [ "message" ] }
    aggregate {
        task_id => "%{[@metadata][data][ExchangeId]}"
        timeout => 5
        push_map_as_event_on_timeout => true
        code => '
            # we want 2 nested objects in the document object REQ_OUT and RESP_IN
            type = event.get("[@metadata][data][Type]")
            map[type] = {}
            event.get("[@metadata][data]").each { |k, v|
                unless [ "ExchangeId"].include? k
                    map[type][k] = v
                end
            }
            event.to_hash.each { |k, v|
                unless [ "@version" ].include? k
                    map[type][k] = v
                end
            }
            event.cancel
        '
        timeout_task_id_field => "ExchangeId"
        timeout_tags => ['_aggregatetimeout']
    }

I get

{
   "RESP_IN" => {
         "RESP_IN" => "test2",
        "severity" => "INFO",
            "path" => "/home/user/foo.txt",
            "Type" => "RESP_IN",
    "ResponseCode" => "200",
      "@timestamp" => 2021-04-14T14:57:51.115Z,
         "Address" => "https://a-link.com",
            "host" => "...",
       "timestamp" => "2021-04-02T05:50:45.534Z"
},
   "REQ_OUT" => {
      "severity" => "INFO",
          "path" => "/home/user/foo.txt",
          "Type" => "REQ_OUT",
       "REQ_OUT" => "test1",
    "@timestamp" => 2021-04-14T14:57:51.115Z,
       "Address" => "https://a-link.com",
          "host" => "...",
     "timestamp" => "2021-04-02T05:50:44.251Z"
},
"@timestamp" => 2021-04-14T14:58:01.101Z,
  "@version" => "1",
"ExchangeId" => "260e06a3-9cb5-4154-bf97-637e929fa4c2",
      "tags" => [
    [0] "_aggregatetimeout"
]
}

Which version of logstash are you running? I wonder if you are hitting the issue mentioned in this thread.