How to aggregate events containing array having duplicate hash in logstash

The structure of the data is-

{
	  brand_id,
	 {
		   models
    	   [
				{
			     	        model_id,
				 	model_number,
				 	model_name	
			  	}
		   ]
	  }
}

Below is the logstash output.

C:\Program Files\Elastic\logstash-6.3.0>bin\logstash -w 1 -f model_continue_test.conf
Sending Logstash's logs to C:/Program Files/Elastic/logstash-6.3.0/logs which is now configured via log4j2.properties
[2018-08-15T16:37:13,630][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-08-15T16:37:14,769][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.3.0"}
[2018-08-15T16:37:20,086][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-08-15T16:37:20,911][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4a1e33f2 run>"}
[2018-08-15T16:37:21,047][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-08-15T16:37:21,657][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

{
        "models" => [
        [0] {
              "model_name" => "test1",
            "model_number" => "abc",
                "model_id" => 1
        },
        [1] {
              "model_name" => "test2",
            "model_number" => "def",
                "model_id" => 2
        }
    ],
    "@timestamp" => 2018-08-15T21:37:22.219Z,
      "@version" => "1",
      "brand_id" => "1"
}
{
        "models" => [
        [0] {
              "model_name" => "test4",
            "model_number" => "sdf",
                "model_id" => 4
        }
    ],
    "@timestamp" => 2018-08-15T21:37:22.233Z,
      "@version" => "1",
      "brand_id" => "5"
}
{
        "models" => [
        [0] {
              "model_name" => "test1",
            "model_number" => "abc",
                "model_id" => 1
        }
    ],
    "@timestamp" => 2018-08-15T21:37:22.233Z,
      "@version" => "1",
      "brand_id" => "1"
}
{
        "models" => [
        [0] {
              "model_name" => "test3",
            "model_number" => "ghi",
                "model_id" => 3
        }
    ],
    "@timestamp" => 2018-08-15T21:37:22.234Z,
      "@version" => "1",
      "brand_id" => "2"
}

[2018-08-15T16:37:23,065][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x4a1e33f2 run>"}

So, now, as you can see, for 'brand_id = 1' there are 2 values containing 2 arrays of hashes (with duplication). I want to aggreagte the values, so that it can have unique brand_id with associated models array having unique hashes depending on the 'model_id'.

I am trying a lot of things using aggregate filter, but nothing is working out. Can anyone please help?

Below are the code I tried. My plan was to aggregate whatever way (with or without duplicates) depending on the brand_id, so that at least I can have exactly the same number of events that of the brand_id and then using ruby {} filter, I can remove any duplicate values.

aggregate {
	task_id => "%{brand_id}-%{model_id}"
	code => "
		map['brand_id'] = event.get('brand_id')
		map['models'] ||= []
		if event.get('models') != nil
			event.get('models').each do |model|     
				map['models'] << {
					'model_number' => model['model_number'],
					'model_id' => model['model_id'],
					'model_name' => model['model_name']
				}
			end
		end
		event.cancel()
	"
	push_previous_map_as_event => true
	timeout => 15
}

That's not going to work, you do not have a model_id field at the top level. I would

split { field => "models" }

and then aggregate using task_id => "%{brand_id}-%{[models][model_id]}"

You'll have to rewrite the insert into models without using .each.

Hi @Badger,

Thank you for your reply. It was my bad, I provided wrong information in the aggregate filter. So I was actually trying with many logic and while copy-pasting provided the wrong one.

Anyway,

I tried with the below, and it did not work-

task_id => "%{brand_id}"

Then I flatten the array 'models' using the below-

	ruby {
		code => "
			models = event.get('models')

			if (models != nil)
				models.each do |model|
					new_event_block.call(event.clone)
					event.set('model_id', model['model_id'])
					event.set('model_number', model['model_number'])
					event.set('model_name', model['model_name'])
				end
			end
		"
	}

	# Remove those events which do not have model_id
	if ![model_id] {
		drop{}
	}

mutate {

remove_field => ["models"]

}

After that the output became simple, yet aggregate filter is still not working as expected. My primary goal is to remove the duplicate event.

The output, after flattening-

{
      "@timestamp" => 2018-08-16T01:12:13.436Z,
      "model_name" => "test2",
        "@version" => "1",
    "model_number" => "def",
        "model_id" => 2,
        "brand_id" => "1"
}
{
      "@timestamp" => 2018-08-16T01:12:13.452Z,
      "model_name" => "test4",
        "@version" => "1",
    "model_number" => "sdf",
        "model_id" => 4,
        "brand_id" => "5"
}
{
      "@timestamp" => 2018-08-16T01:12:13.452Z,
      "model_name" => "test1",
        "@version" => "1",
    "model_number" => "abc",
        "model_id" => 1,
        "brand_id" => "1"
}
{
      "@timestamp" => 2018-08-16T01:12:13.452Z,
      "model_name" => "test3",
        "@version" => "1",
    "model_number" => "ghi",
        "model_id" => 3,
        "brand_id" => "2"
}
{
      "@timestamp" => 2018-08-16T01:12:13.436Z,
      "model_name" => "test1",
        "@version" => "1",
    "model_number" => "abc",
        "model_id" => 1,
        "brand_id" => "1"
}

Even though the above is looking simple, aggregation was not working as expected. Can you please help?

Assuming you are writing to ES, you can use a fingerprint filter to generate the document_id based on brand_id and model_id. Just overwrite the event in elasticsearch with the same data if there is a duplicate.

Hi @Badger,
Yes, the output filter is to ElasticSearch with an 'update' action to an existing index having data.
My concern is , blind update can overwrite with wrong data.
Let me try to use the 'fingerprint' filter.
Thank you!