How to aggregate events containing array having duplicate hash in logstash

The structure of the data is-

{
	  brand_id,
	 {
		   models
    	   [
				{
			     	        model_id,
				 	model_number,
				 	model_name	
			  	}
		   ]
	  }
}

Below is the logstash output.

C:\Program Files\Elastic\logstash-6.3.0>bin\logstash -w 1 -f model_continue_test.conf
Sending Logstash's logs to C:/Program Files/Elastic/logstash-6.3.0/logs which is now configured via log4j2.properties
[2018-08-15T16:37:13,630][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-08-15T16:37:14,769][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.3.0"}
[2018-08-15T16:37:20,086][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-08-15T16:37:20,911][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4a1e33f2 run>"}
[2018-08-15T16:37:21,047][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-08-15T16:37:21,657][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

{
        "models" => [
        [0] {
              "model_name" => "test1",
            "model_number" => "abc",
                "model_id" => 1
        },
        [1] {
              "model_name" => "test2",
            "model_number" => "def",
                "model_id" => 2
        }
    ],
    "@timestamp" => 2018-08-15T21:37:22.219Z,
      "@version" => "1",
      "brand_id" => "1"
}
{
        "models" => [
        [0] {
              "model_name" => "test4",
            "model_number" => "sdf",
                "model_id" => 4
        }
    ],
    "@timestamp" => 2018-08-15T21:37:22.233Z,
      "@version" => "1",
      "brand_id" => "5"
}
{
        "models" => [
        [0] {
              "model_name" => "test1",
            "model_number" => "abc",
                "model_id" => 1
        }
    ],
    "@timestamp" => 2018-08-15T21:37:22.233Z,
      "@version" => "1",
      "brand_id" => "1"
}
{
        "models" => [
        [0] {
              "model_name" => "test3",
            "model_number" => "ghi",
                "model_id" => 3
        }
    ],
    "@timestamp" => 2018-08-15T21:37:22.234Z,
      "@version" => "1",
      "brand_id" => "2"
}

[2018-08-15T16:37:23,065][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x4a1e33f2 run>"}

So, now, as you can see, for 'brand_id = 1' there are 2 values containing 2 arrays of hashes (with duplication). I want to aggreagte the values, so that it can have unique brand_id with associated models array having unique hashes depending on the 'model_id'.

I am trying a lot of things using aggregate filter, but nothing is working out. Can anyone please help?

Below are the code I tried. My plan was to aggregate whatever way (with or without duplicates) depending on the brand_id, so that at least I can have exactly the same number of events that of the brand_id and then using ruby {} filter, I can remove any duplicate values.

aggregate {
	task_id => "%{brand_id}-%{model_id}"
	code => "
		map['brand_id'] = event.get('brand_id')
		map['models'] ||= []
		if event.get('models') != nil
			event.get('models').each do |model|     
				map['models'] << {
					'model_number' => model['model_number'],
					'model_id' => model['model_id'],
					'model_name' => model['model_name']
				}
			end
		end
		event.cancel()
	"
	push_previous_map_as_event => true
	timeout => 15
}

That's not going to work, you do not have a model_id field at the top level. I would

split { field => "models" }

and then aggregate using task_id => "%{brand_id}-%{[models][model_id]}"

You'll have to rewrite the insert into models without using .each.

Hi @Badger,

Thank you for your reply. It was my bad, I provided wrong information in the aggregate filter. So I was actually trying with many logic and while copy-pasting provided the wrong one.

Anyway,

I tried with the below, and it did not work-

task_id => "%{brand_id}"

Then I flatten the array 'models' using the below-

	ruby {
		code => "
			models = event.get('models')

			if (models != nil)
				models.each do |model|
					new_event_block.call(event.clone)
					event.set('model_id', model['model_id'])
					event.set('model_number', model['model_number'])
					event.set('model_name', model['model_name'])
				end
			end
		"
	}

	# Remove those events which do not have model_id
	if ![model_id] {
		drop{}
	}

mutate {

remove_field => ["models"]

}

After that the output became simple, yet aggregate filter is still not working as expected. My primary goal is to remove the duplicate event.

The output, after flattening-

{
      "@timestamp" => 2018-08-16T01:12:13.436Z,
      "model_name" => "test2",
        "@version" => "1",
    "model_number" => "def",
        "model_id" => 2,
        "brand_id" => "1"
}
{
      "@timestamp" => 2018-08-16T01:12:13.452Z,
      "model_name" => "test4",
        "@version" => "1",
    "model_number" => "sdf",
        "model_id" => 4,
        "brand_id" => "5"
}
{
      "@timestamp" => 2018-08-16T01:12:13.452Z,
      "model_name" => "test1",
        "@version" => "1",
    "model_number" => "abc",
        "model_id" => 1,
        "brand_id" => "1"
}
{
      "@timestamp" => 2018-08-16T01:12:13.452Z,
      "model_name" => "test3",
        "@version" => "1",
    "model_number" => "ghi",
        "model_id" => 3,
        "brand_id" => "2"
}
{
      "@timestamp" => 2018-08-16T01:12:13.436Z,
      "model_name" => "test1",
        "@version" => "1",
    "model_number" => "abc",
        "model_id" => 1,
        "brand_id" => "1"
}

Even though the above is looking simple, aggregation was not working as expected. Can you please help?

Assuming you are writing to ES, you can use a fingerprint filter to generate the document_id based on brand_id and model_id. Just overwrite the event in elasticsearch with the same data if there is a duplicate.

Hi @Badger,
Yes, the output filter is to ElasticSearch with an 'update' action to an existing index having data.
My concern is , blind update can overwrite with wrong data.
Let me try to use the 'fingerprint' filter.
Thank you!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.