Hello,
I'm using the following filters in my pipeline :
filter {
    xml {
            id => "xmlparsing_agg"
            source => "message"
            target => "doc"
            force_array => false
            remove_namespaces => true
    }
     elasticsearch {
            hosts => ["host1:9200,host2:9200"]
            index => "service*"
            query_template => "/etc/logstash/lookupHostname_agg.json"
            fields => {"HostName" => "HostName"}
    }
    if [doc][eventPointData][eventData][eventSequence][wmb:creationTime] {
    mutate {
            add_field => {
                    "[CreationTime]" => "%{[doc][eventPointData][eventData][eventSequence][wmb:creationTime]}"
                    "[EventName]" => "%{[doc][eventPointData][eventData][eventIdentity][wmb:eventName]}"
                    "[MessageFlow]" => "%{[doc][eventPointData][messageFlowData][messageFlow][wmb:uniqueFlowName]}"
                    "[ParentTransactionID]" => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:parentTransactionId]}"
                    "[GlobalTransactionID]" => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:globalTransactionId]}"
                    "[LocalTransactionID]" => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:localTransactionId]}"
                    "[MessageSequence]" => "%{[doc][eventPointData][eventData][eventSequence][wmb:counter]}"
                    "[BrokerUUID]" => "%{[doc][eventPointData][messageFlowData][broker][wmb:UUID]}"
                    "[Properties]" => "%{[doc][applicationData][complexContent][Root][Properties]}"
                    "[MQMD]" => "No MQMD available"
                    "[HTTPInputHeader]" => "No HTTPInputHeader available"
                    "[Payload]" => "No Payload available"
                    "[LocalEnvironment]" => "No LocalEnvironment available"
                    "[initialEvent]" => "%{[message]}"
                    "[parsedEvent]" => "%{[doc]}"
                    "[mytype]" => "realtime"
            }
    }
    }
 mutate {
            remove_field => [ "doc", "@version", "message" ]
    }
    aggregate {
            task_id => "%{LocalTransactionID}"
            code => "
                    map['HostName'] = event.get('HostName')
                    map['wmbevents'] ||= 0
                    map['wmbevents'] += 1
                    map['subevents'] ||= []
                    map['subevents'] << {
                            'name' => event.get('EventName'),
                            'sequence' => event.get('MessageSequence'),
                            'se_time' => event.get('CreationTime')
                    }
                    map['mytype'] ||= 'aggregated'
                    event.cancel()
            "
            push_map_as_event_on_timeout => true
            timeout_task_id_field => "LocalTransactionID"
            timeout => 180
            inactivity_timeout => 30
            timeout_tags => ['_aggregatetimeout']
            timeout_code => "event.set('multiple_wmbevents', event.get('wmbevents') > 1)"
    }
}
Input is in the form of XML events (generated by IBM IIB) and shipped from a JMS queue to a Kafka cluster by a separate pipeline. They're read from the cluster using a Kafka input plugin in this pipeline. Output goes to Elasticsearch.
Here's an example (source) from the Elasticsearch 'aggregated' index:
"multiple_wmbevents": true,
"@timestamp": "2017-12-15T10:00:46.001Z",
"LocalTransactionID": "66d7a4fe-e17b-11e7-90b0-0ac128210000-1",
"subevents": [
  {
    "name": "DocumentCore_Response",
    "sequence": "4",
    "se_time": "2017-12-15T09:59:59.612971Z"
  },
  {
    "name": "DocumentCore_Request",
    "sequence": "1",
    "se_time": "2017-12-15T09:59:59.158861Z"
  },
  {
    "name": "SFDCDocument_Response",
    "sequence": "3",
    "se_time": "2017-12-15T09:59:59.609569Z"
  },
  {
    "name": "SFDCDocument_Request",
    "sequence": "2",
    "se_time": "2017-12-15T09:59:59.379210Z"
  }
],
"@version": "1",
"mytype": "aggregated",
"HostName": "esb-tt01",
"wmbevents": 4,
"tags": [
  "_aggregatetimeout"
]
}
The whole Elastic stack is on 6.0.0.
I've got the following issues/questions
- 
I reused the mutate filter from another pipeline as for some reason it seems that putting the task_id in the aggregate filter directly like this
task_id => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:localTransactionId]}"doesn't work. Did I miss something here? - 
I'd like to extend the Ruby code of the aggregate filter in order to calculate the difference between the
se_timeof the subevent with the highest sequence and the subevent with the lowest sequence. Is there any chance someone could help me with that? (Sorry, Ruby's totally new to me) - 
Every now and then the XML filter throws an error in the logs about an undefined namespace. Is there a way to handle this so I can dump the whole thing in a separate index? I know there's the
tag_on_failurein the JSON filter but I didn't find the equivalent in the documentation of the XML filter. - 
The initial XML events contain a lot of 'overhead' in terms of tags that are not needed in the final document in Elasticsearch. As you can see I'm just creating a 'flat' structure by putting everything in the root of the event with the mutate filter. Then, I just remove the initial XML and the parsed XML with another mutate filter. I'm just wondering: is this the 'right' way to use the XML filter, in terms of performance?
 
Thank you very much for your help!
Jonas