Hello,
I'm using the following filters in my pipeline :
filter {
xml {
id => "xmlparsing_agg"
source => "message"
target => "doc"
force_array => false
remove_namespaces => true
}
elasticsearch {
hosts => ["host1:9200,host2:9200"]
index => "service*"
query_template => "/etc/logstash/lookupHostname_agg.json"
fields => {"HostName" => "HostName"}
}
if [doc][eventPointData][eventData][eventSequence][wmb:creationTime] {
mutate {
add_field => {
"[CreationTime]" => "%{[doc][eventPointData][eventData][eventSequence][wmb:creationTime]}"
"[EventName]" => "%{[doc][eventPointData][eventData][eventIdentity][wmb:eventName]}"
"[MessageFlow]" => "%{[doc][eventPointData][messageFlowData][messageFlow][wmb:uniqueFlowName]}"
"[ParentTransactionID]" => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:parentTransactionId]}"
"[GlobalTransactionID]" => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:globalTransactionId]}"
"[LocalTransactionID]" => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:localTransactionId]}"
"[MessageSequence]" => "%{[doc][eventPointData][eventData][eventSequence][wmb:counter]}"
"[BrokerUUID]" => "%{[doc][eventPointData][messageFlowData][broker][wmb:UUID]}"
"[Properties]" => "%{[doc][applicationData][complexContent][Root][Properties]}"
"[MQMD]" => "No MQMD available"
"[HTTPInputHeader]" => "No HTTPInputHeader available"
"[Payload]" => "No Payload available"
"[LocalEnvironment]" => "No LocalEnvironment available"
"[initialEvent]" => "%{[message]}"
"[parsedEvent]" => "%{[doc]}"
"[mytype]" => "realtime"
}
}
}
mutate {
remove_field => [ "doc", "@version", "message" ]
}
aggregate {
task_id => "%{LocalTransactionID}"
code => "
map['HostName'] = event.get('HostName')
map['wmbevents'] ||= 0
map['wmbevents'] += 1
map['subevents'] ||= []
map['subevents'] << {
'name' => event.get('EventName'),
'sequence' => event.get('MessageSequence'),
'se_time' => event.get('CreationTime')
}
map['mytype'] ||= 'aggregated'
event.cancel()
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "LocalTransactionID"
timeout => 180
inactivity_timeout => 30
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('multiple_wmbevents', event.get('wmbevents') > 1)"
}
}
Input is in the form of XML events (generated by IBM IIB) and shipped from a JMS queue to a Kafka cluster by a separate pipeline. They're read from the cluster using a Kafka input plugin in this pipeline. Output goes to Elasticsearch.
Here's an example (source) from the Elasticsearch 'aggregated' index:
"multiple_wmbevents": true,
"@timestamp": "2017-12-15T10:00:46.001Z",
"LocalTransactionID": "66d7a4fe-e17b-11e7-90b0-0ac128210000-1",
"subevents": [
{
"name": "DocumentCore_Response",
"sequence": "4",
"se_time": "2017-12-15T09:59:59.612971Z"
},
{
"name": "DocumentCore_Request",
"sequence": "1",
"se_time": "2017-12-15T09:59:59.158861Z"
},
{
"name": "SFDCDocument_Response",
"sequence": "3",
"se_time": "2017-12-15T09:59:59.609569Z"
},
{
"name": "SFDCDocument_Request",
"sequence": "2",
"se_time": "2017-12-15T09:59:59.379210Z"
}
],
"@version": "1",
"mytype": "aggregated",
"HostName": "esb-tt01",
"wmbevents": 4,
"tags": [
"_aggregatetimeout"
]
}
The whole Elastic stack is on 6.0.0.
I've got the following issues/questions
-
I reused the mutate filter from another pipeline as for some reason it seems that putting the task_id in the aggregate filter directly like this
task_id => "%{[doc][eventPointData][eventData][eventCorrelation][wmb:localTransactionId]}"doesn't work. Did I miss something here? -
I'd like to extend the Ruby code of the aggregate filter in order to calculate the difference between the
se_timeof the subevent with the highest sequence and the subevent with the lowest sequence. Is there any chance someone could help me with that? (Sorry, Ruby's totally new to me) -
Every now and then the XML filter throws an error in the logs about an undefined namespace. Is there a way to handle this so I can dump the whole thing in a separate index? I know there's the
tag_on_failurein the JSON filter but I didn't find the equivalent in the documentation of the XML filter. -
The initial XML events contain a lot of 'overhead' in terms of tags that are not needed in the final document in Elasticsearch. As you can see I'm just creating a 'flat' structure by putting everything in the root of the event with the mutate filter. Then, I just remove the initial XML and the parsed XML with another mutate filter. I'm just wondering: is this the 'right' way to use the XML filter, in terms of performance?
Thank you very much for your help!
Jonas