I have a collection of XML file, each one with one event... Most of them are "parent" events, but some of them are "child" events related to some parent. I'd like to aggregate them, creating a "nfe" field for the parent and a collection (array) for the children (field "eventos").
My test case is only 2 files: one parent and one children.
The thing is, I couldn't even get the aggregate filter to trigger... Rubydebug always show 2 events in the output. The Elasticsearch indexes only 1 document, but it is not an aggregated result for both files, but what happens is that the second event in the pipeline overwrites the first one in the ES index, since both of them have the same "id".
As soon as I get the aggregation to trigger I can then organize my map to output something meaningful to ES, but at the moment I'm still fighting with the aggregate filter.
Thank you in advance for your help...
Here is my pipeline conf:
input {
stdin { }
}
filter {
mutate {
replace => { "message" => "<xml>%{message}</xml>" }
}
xml {
source => "message"
target => "origem"
}
# Parent event
if [origem][nfeProc] {
mutate {
add_field => {
"id" => "%{[origem][nfeProc][0][NFe][0][infNFe][0][Id]}"
"timestamp" => "%{[origem][nfeProc][0][protNFe][0][infProt][0][dhRecbto]}"
}
}
}
# Child/nested events
else if [origem][procEventoNFe] {
mutate {
add_field => {
"id" => "NFe%{[origem][procEventoNFe][0][retEvento][0][infEvento][0][chNFe][0]}"
"timestamp" => "%{[origem][procEventoNFe][0][retEvento][0][infEvento][0][dhRegEvento]}"
}
}
} else {
mutate {
add_tag => [ "error" ]
}
}
# At this point both types of events (parent and child) it should have "id" : "NFe43180893327161000175550010000588171770788542"
# Somehow I could not aggregate them by id
# it's like the aggregate filter is never triggered
aggregate {
task_id => "%{id}"
code => "map['x'] = 'test'"
timeout => 600
timeout_task_id_field => "id"
timeout_tags => ['_aggregatetimeout']
push_map_as_event_on_timeout => true
}
mutate {
remove_field => ["message", "host", "origem", "timestamp"]
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["vmsrv103:9200"]
index => "nfe"
document_type => "default"
document_id => "%{id}"
}
}
The only thing you are adding to the map is the field x, so that is all that I would expect to be in the event when the timeout fires. Do you really need a 10 minute timeout?
This version is the "oh for the love of God please work" one... Inthe original version I had some code adding more fields to the map and a timeout of 3.
The thing is, not even this "x" is being added to the output. As a matter of fact I still don't think it is being aggregated since the output is something like this (2 events when I expected only one):
The input lines are too large to paste here, but they look something like below. Are you looking for something specific in the input lines?
Thank you
Parent:
<nfeProc xmlns="http://www.portalfiscal.inf.br/nfe" versao="4.00"><NFe xmlns="http://www.portalfiscal.inf.br/nfe"><infNFe versao="4.00" Id="NFe43180893327161000175550010000588171770788542"> (a lot of XML but the Id is here as an attribute for infNFe) </infNFe></nfeProc>
Child:
<procEventoNFe xmlns="http://www.portalfiscal.inf.br/nfe" versao="1.00"> (a lot of XML...) <retEvento versao="1.00" xmlns="http://www.portalfiscal.inf.br/nfe"><infEvento><chNFe>43180893327161000175550010000588171770788542</chNFe><tpEvento>110110</tpEvento><nSeqEvento>1</nSeqEvento><CNPJDest>89550032000174</CNPJDest><dhRegEvento>2018-08-07T17:16:29-03:00</dhRegEvento></infEvento></retEvento></procEventoNFe>
Thank you for figuring that out!
I can now move on with this conf file...
The only thing is, I wanted this behaviour that Logstash has with stdin: process what comes into the pipeline and then finishes the process.
Is there any way to achieve that? Just to be clear, I'll run this pipeline every now and then through a configuration in Jenkins... I don't want Logstash to wait for more input (that will not come until the following month)
Summing it up: how could I, with a file-input, make Logstash stop after a timeout?
I don't think that can be done. The stdin input is unique in the way it stops logstash. For other inputs you would need a script to monitor logstash externally and kill it if it is finished.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.