Aggregate filter - calculate difference between timestamps


I'm using the following filters in my pipeline :

filter {

    xml {
            id => "xmlparsing_agg"
            source => "message"
            target => "doc"
            force_array => false
            remove_namespaces => true

     elasticsearch {
            hosts => ["host1:9200,host2:9200"]
            index => "service*"
            query_template => "/etc/logstash/lookupHostname_agg.json"
            fields => {"HostName" => "HostName"}

    if [doc][eventPointData][eventData][eventSequence][wmb:creationTime] {
    mutate {
            add_field => {
                    "[CreationTime]" => "%{[doc][eventPointData][eventData][eventSequence][wmb:creationTime]}"
                    "[EventName]" => "%{[doc][eventPointData][eventData][eventIdentity][wmb:eventName]}"
                    "[MessageFlow]" => "%{[doc][eventPointData][messageFlowData][messageFlow][wmb:uniqueFlowName]}"
                    "[ParentTransactionID]" => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:parentTransactionId]}"
                    "[GlobalTransactionID]" => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:globalTransactionId]}"
                    "[LocalTransactionID]" => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:localTransactionId]}"
                    "[MessageSequence]" => "%{[doc][eventPointData][eventData][eventSequence][wmb:counter]}"
                    "[BrokerUUID]" => "%{[doc][eventPointData][messageFlowData][broker][wmb:UUID]}"
                    "[Properties]" => "%{[doc][applicationData][complexContent][Root][Properties]}"
                    "[MQMD]" => "No MQMD available"
                    "[HTTPInputHeader]" => "No HTTPInputHeader available"
                    "[Payload]" => "No Payload available"
                    "[LocalEnvironment]" => "No LocalEnvironment available"
                    "[initialEvent]" => "%{[message]}"
                    "[parsedEvent]" => "%{[doc]}"
                    "[mytype]" => "realtime"
 mutate {
            remove_field => [ "doc", "@version", "message" ]
    aggregate {
            task_id => "%{LocalTransactionID}"
            code => "
                    map['HostName'] = event.get('HostName')
                    map['wmbevents'] ||= 0
                    map['wmbevents'] += 1
                    map['subevents'] ||= []
                    map['subevents'] << {
                            'name' => event.get('EventName'),
                            'sequence' => event.get('MessageSequence'),
                            'se_time' => event.get('CreationTime')
                    map['mytype'] ||= 'aggregated'
            push_map_as_event_on_timeout => true
            timeout_task_id_field => "LocalTransactionID"
            timeout => 180
            inactivity_timeout => 30
            timeout_tags => ['_aggregatetimeout']
            timeout_code => "event.set('multiple_wmbevents', event.get('wmbevents') > 1)"

Input is in the form of XML events (generated by IBM IIB) and shipped from a JMS queue to a Kafka cluster by a separate pipeline. They're read from the cluster using a Kafka input plugin in this pipeline. Output goes to Elasticsearch.

Here's an example (source) from the Elasticsearch 'aggregated' index:

"multiple_wmbevents": true,
"@timestamp": "2017-12-15T10:00:46.001Z",
"LocalTransactionID": "66d7a4fe-e17b-11e7-90b0-0ac128210000-1",
"subevents": [
    "name": "DocumentCore_Response",
    "sequence": "4",
    "se_time": "2017-12-15T09:59:59.612971Z"
    "name": "DocumentCore_Request",
    "sequence": "1",
    "se_time": "2017-12-15T09:59:59.158861Z"
    "name": "SFDCDocument_Response",
    "sequence": "3",
    "se_time": "2017-12-15T09:59:59.609569Z"
    "name": "SFDCDocument_Request",
    "sequence": "2",
    "se_time": "2017-12-15T09:59:59.379210Z"
"@version": "1",
"mytype": "aggregated",
"HostName": "esb-tt01",
"wmbevents": 4,
"tags": [

The whole Elastic stack is on 6.0.0.

I've got the following issues/questions

  • I reused the mutate filter from another pipeline as for some reason it seems that putting the task_id in the aggregate filter directly like this task_id => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:localTransactionId]}" doesn't work. Did I miss something here?

  • I'd like to extend the Ruby code of the aggregate filter in order to calculate the difference between the se_time of the subevent with the highest sequence and the subevent with the lowest sequence. Is there any chance someone could help me with that? (Sorry, Ruby's totally new to me)

  • Every now and then the XML filter throws an error in the logs about an undefined namespace. Is there a way to handle this so I can dump the whole thing in a separate index? I know there's the tag_on_failure in the JSON filter but I didn't find the equivalent in the documentation of the XML filter.

  • The initial XML events contain a lot of 'overhead' in terms of tags that are not needed in the final document in Elasticsearch. As you can see I'm just creating a 'flat' structure by putting everything in the root of the event with the mutate filter. Then, I just remove the initial XML and the parsed XML with another mutate filter. I'm just wondering: is this the 'right' way to use the XML filter, in terms of performance?

Thank you very much for your help!


Hi @magnusbaeck, any chance you could have a look at my questions? Thanks a lot!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.