The structure of the data is-
{
brand_id,
{
models
[
{
model_id,
model_number,
model_name
}
]
}
}
Below is the logstash output.
C:\Program Files\Elastic\logstash-6.3.0>bin\logstash -w 1 -f model_continue_test.conf
Sending Logstash's logs to C:/Program Files/Elastic/logstash-6.3.0/logs which is now configured via log4j2.properties
[2018-08-15T16:37:13,630][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-08-15T16:37:14,769][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.3.0"}
[2018-08-15T16:37:20,086][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-08-15T16:37:20,911][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4a1e33f2 run>"}
[2018-08-15T16:37:21,047][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-08-15T16:37:21,657][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
{
"models" => [
[0] {
"model_name" => "test1",
"model_number" => "abc",
"model_id" => 1
},
[1] {
"model_name" => "test2",
"model_number" => "def",
"model_id" => 2
}
],
"@timestamp" => 2018-08-15T21:37:22.219Z,
"@version" => "1",
"brand_id" => "1"
}
{
"models" => [
[0] {
"model_name" => "test4",
"model_number" => "sdf",
"model_id" => 4
}
],
"@timestamp" => 2018-08-15T21:37:22.233Z,
"@version" => "1",
"brand_id" => "5"
}
{
"models" => [
[0] {
"model_name" => "test1",
"model_number" => "abc",
"model_id" => 1
}
],
"@timestamp" => 2018-08-15T21:37:22.233Z,
"@version" => "1",
"brand_id" => "1"
}
{
"models" => [
[0] {
"model_name" => "test3",
"model_number" => "ghi",
"model_id" => 3
}
],
"@timestamp" => 2018-08-15T21:37:22.234Z,
"@version" => "1",
"brand_id" => "2"
}
[2018-08-15T16:37:23,065][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x4a1e33f2 run>"}
So, now, as you can see, for 'brand_id = 1' there are 2 values containing 2 arrays of hashes (with duplication). I want to aggreagte the values, so that it can have unique brand_id with associated models array having unique hashes depending on the 'model_id'.
I am trying a lot of things using aggregate filter, but nothing is working out. Can anyone please help?
Below are the code I tried. My plan was to aggregate whatever way (with or without duplicates) depending on the brand_id, so that at least I can have exactly the same number of events that of the brand_id and then using ruby {} filter, I can remove any duplicate values.
aggregate {
task_id => "%{brand_id}-%{model_id}"
code => "
map['brand_id'] = event.get('brand_id')
map['models'] ||= []
if event.get('models') != nil
event.get('models').each do |model|
map['models'] << {
'model_number' => model['model_number'],
'model_id' => model['model_id'],
'model_name' => model['model_name']
}
end
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 15
}