Aggregate filter doesn't work in merging multi events from elasticsearch

hi all
I have following lines in my log which is parsing to logstash to do some filters and store them in the elasticsearch

IT=1  AC=12345 p1=12345
IW=1 AC=12345 A1=check1 
IW=1 E1=0
IW=2 AC=12345 A1=check2
IW=2 E1=4
IT=1 p2=ok
IT=2  AC=7894 p1=245
IW=4 AC=7894 A1=get 
IW=4 E1=2
IT=2 p2=failed

I filter above log using two aggregate filter based on the value of IW field and value of IT filed and store them in the elasticsearch as following events:

   IT=1 AC=12345 p1=12345  p2=ok
   IW=1 A1=check1 E1=0 AC=12345
   IW=2 A1=check2 E1=4 AC=12345
   IT=2 AC=7894 p1=245 p2=failed
   IW=4 A1=get E1=2  AC=7894

after that, I am using a logstash conf file which load above events from elasticsearch and perform an aggregate filter based on the field AC so that aggregate events which have same AC value.
the expected results are as following:

   IT=1 AC=12345 p1=12345   IW={1,2} A1={check1,check2} E1={0,4}  p2=ok
   IT=2 AC=7894 p1=245 IW=4 A1=get E1=2 p2=failed

but, output is as following ( same as previous elasticsearch events ) which has not been aggregated

   IT=1 AC=12345 p1=12345  p2=ok
   IW=1 A1=check1 E1=0 AC=12345
   IW=2 A1=check2 E1=4 AC=12345
   IT=2 AC=7894 p1=245 p2=failed
   IW=4 A1=get E1=2  AC=7894

how can i handle this issue?

any advise will be so appreciated

Dear @Badger
Could you please advise me about this?
Many thanks

You are asking us to guess why a configuration that you haven't shown us doesn't do what you want. We are not psychic. I cannot help.

1 Like

@Badger
sorry. it's my mistake. logstash config is as following:

input {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "myindex"
}
}
filter {
    aggregate {
	
	        task_id => "%{[AC]}"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "task_id"
        timeout => 60
        code => '
	
			if (event.get("IT")!= nil) 

		    map["p1"] ||= []
	    	    map["p1"]=event.get("p1")
		    map["p2"] ||= []
	            map["p2"]=event.get("p2")
		    map["IT"] ||= []
	            map["IT"]=event.get("IT")
		    end
			if (event.get("IW")!= nil) 
                    map["A1"] ||= []
                    map["A1"] <<  event.get("A1")
		    map["E1"] ||= []
                    map["E1"] <<  event.get("E1")
	            map["IW"] ||= []
                   map["IW"] =  event.get("IW")

            end
			event.cancel

        '
    }
}

	output {
  elasticsearch { 
    hosts => ["http://localhost:9200"]
    index => "myindex2"

 }
  stdout { codec => rubydebug }
}

With that input and that configuration I get

{
        "IW" => "2",
        "p2" => "ok",
        "IT" => "1",
        "A1" => [
    [0] "check1",
    [1] "check2"
],
        "E1" => [
    [0] "0",
    [1] "4"
],
        "p1" => "12345",
  "@version" => "1",
"@timestamp" => 2019-10-06T14:36:16.716Z,
   "task_id" => "12345"
}
{
        "IW" => "4",
        "p2" => "failed",
        "IT" => "2",
        "A1" => [
    [0] "get"
],
        "E1" => [
    [0] "2"
],
        "p1" => "245",
  "@version" => "1",
"@timestamp" => 2019-10-06T14:36:16.726Z,
   "task_id" => "7894"
}

so I don't think you are doing what you think you are doing.

Note that you are setting IW using =, not <<, so you do not get both values.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.