Could not get aggregate filter to... well, aggregate

I have a collection of XML file, each one with one event... Most of them are "parent" events, but some of them are "child" events related to some parent. I'd like to aggregate them, creating a "nfe" field for the parent and a collection (array) for the children (field "eventos").

My test case is only 2 files: one parent and one children.

The thing is, I couldn't even get the aggregate filter to trigger... Rubydebug always show 2 events in the output. The Elasticsearch indexes only 1 document, but it is not an aggregated result for both files, but what happens is that the second event in the pipeline overwrites the first one in the ES index, since both of them have the same "id".

As soon as I get the aggregation to trigger I can then organize my map to output something meaningful to ES, but at the moment I'm still fighting with the aggregate filter.

Thank you in advance for your help...

Here is my pipeline conf:

input {
	stdin { }
}
filter {
	mutate {
		replace => { "message" => "<xml>%{message}</xml>" }
	}

	xml {
		source => "message"
		target => "origem"
	}

    # Parent event
	if [origem][nfeProc] {
		mutate {
			add_field => {
				"id" => "%{[origem][nfeProc][0][NFe][0][infNFe][0][Id]}"
				"timestamp" => "%{[origem][nfeProc][0][protNFe][0][infProt][0][dhRecbto]}"
			}
		}
	} 
	# Child/nested events
	else if [origem][procEventoNFe] {
		mutate {
			add_field => {
				"id" => "NFe%{[origem][procEventoNFe][0][retEvento][0][infEvento][0][chNFe][0]}"
				"timestamp" => "%{[origem][procEventoNFe][0][retEvento][0][infEvento][0][dhRegEvento]}"
			}
		}
	} else {
		mutate {
			add_tag => [ "error" ]
  		}
	}

    # At this point both types of events (parent and child) it should have "id" : "NFe43180893327161000175550010000588171770788542"
    
    # Somehow I could not aggregate them by id
    # it's like the aggregate filter is never triggered
	aggregate {
		task_id => "%{id}"
		code => "map['x'] = 'test'"
		timeout => 600
		timeout_task_id_field => "id"
		timeout_tags => ['_aggregatetimeout']
		push_map_as_event_on_timeout => true
	}

	mutate {
		remove_field => ["message", "host", "origem", "timestamp"]
	}
}
output {
	stdout { codec => rubydebug }
	
	elasticsearch {
    	hosts => ["vmsrv103:9200"]
    	index => "nfe"
    	document_type => "default"
    	document_id => "%{id}"
	}
}

The only thing you are adding to the map is the field x, so that is all that I would expect to be in the event when the timeout fires. Do you really need a 10 minute timeout?

Can you show some actual input lines?

This version is the "oh for the love of God please work" one... Inthe original version I had some code adding more fields to the map and a timeout of 3.

The thing is, not even this "x" is being added to the output. As a matter of fact I still don't think it is being aggregated since the output is something like this (2 events when I expected only one):

{
            "id" => "NFe43180893327161000175550010000588171770788542",
    "@timestamp" => 2019-01-23T17:36:47.713Z,
      "@version" => "1"
}
{
            "id" => "NFe43180893327161000175550010000588171770788542",
    "@timestamp" => 2019-01-23T17:36:47.785Z,
      "@version" => "1"
}

The input lines are too large to paste here, but they look something like below. Are you looking for something specific in the input lines?

Thank you

Parent:

<nfeProc xmlns="http://www.portalfiscal.inf.br/nfe" versao="4.00"><NFe xmlns="http://www.portalfiscal.inf.br/nfe"><infNFe versao="4.00" Id="NFe43180893327161000175550010000588171770788542"> (a lot of XML but the Id is here as an attribute for infNFe) </infNFe></nfeProc>

Child:

<procEventoNFe xmlns="http://www.portalfiscal.inf.br/nfe" versao="1.00"> (a lot of XML...) <retEvento versao="1.00" xmlns="http://www.portalfiscal.inf.br/nfe"><infEvento><chNFe>43180893327161000175550010000588171770788542</chNFe><tpEvento>110110</tpEvento><nSeqEvento>1</nSeqEvento><CNPJDest>89550032000174</CNPJDest><dhRegEvento>2018-08-07T17:16:29-03:00</dhRegEvento></infEvento></retEvento></procEventoNFe>

The problem is not your aggregate filter :smiley:

When a stdin input reaches EOF, it shuts logstash down. There is a similar problem with a generator input. Thus

input { generator { count => 1 message => '{ "id": "NFe43180893327161000175550010000588171770788542"}' } }
filter { json { source => "message" } }
filter {
    aggregate {
        task_id => "%{id}"
        code => "map['x'] = 'test'"
        timeout => 6
        timeout_task_id_field => "id"
        timeout_tags => ['_aggregatetimeout']
        push_map_as_event_on_timeout => true
    }
}
output { stdout { codec => rubydebug } }

dumps the event but not the aggregate because

[INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main"

Read the XML from a file and you will get an aggregate

For example

input { file { path => "/somePath/foo.xml" sincedb_path => "/dev/null" start_position => "beginning" } }
filter {
    xml { source => "message" target => "theXML" }
    aggregate {
        task_id => "%{[theXML][id]}"
        code => "map['x'] = 'test'"
        timeout => 6
        timeout_task_id_field => "id"
        timeout_tags => ['_aggregatetimeout']
        push_map_as_event_on_timeout => true
    }
}
output { stdout { codec => rubydebug } }

gets me

{
    "theXML" => {
    "id" => [
        [0] "NFe43180893327161000175550010000588171770788542"
    ]
},
   "message" => "<foo><id>NFe43180893327161000175550010000588171770788542</id></foo>"
}
{
         "x" => "test",
      "tags" => [
    [0] "_aggregatetimeout"
],
        "id" => "NFe43180893327161000175550010000588171770788542"
}
1 Like

Thank you for figuring that out!
I can now move on with this conf file...

The only thing is, I wanted this behaviour that Logstash has with stdin: process what comes into the pipeline and then finishes the process.

Is there any way to achieve that? Just to be clear, I'll run this pipeline every now and then through a configuration in Jenkins... I don't want Logstash to wait for more input (that will not come until the following month)

Summing it up: how could I, with a file-input, make Logstash stop after a timeout?

I don't think that can be done. The stdin input is unique in the way it stops logstash. For other inputs you would need a script to monitor logstash externally and kill it if it is finished.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.