Logstash Aggregate map working only for few rows while merging two data sources

I am using the latest version of logstash(7.6.2). I am trying to merge rows from two different files using a common ID. Find below the samples of the two data sources. Also, find below my desired output.

Sample Data 1 with columns ID, Country, State

111 US NY
112 IN KA
113 US MA

Sample Data 2 with columns ID and Info

111 abc
111 abd
112 xyz
112 xya
113 qwe
113 qwr

Desired Output with merge

111 abc US NY
111 abd US NY
112 xyz IN KA
112 xya IN KA
113 qwe US MA
113 qwr US MA

I have tried using aggregate as shown in the below config file. I have also set pipeline workers to 1 and disabled java execution by adding the following commands in logstash.yml

pipeline.workers: 1
pipeline.java_execution: false

Config File

    if [Country] =~ /.+/ {
		aggregate {
			task_id => "%{ID}"
			code => "
			map['country'] = event.get('Country')
			map['state'] = event.get('State')
			event.cancel()
			"
		}
		drop{}
	} 
	if ![Country] {
		aggregate {
			task_id => "%{ID}"
			code => "
			event.set('Country', map['country'])
			event.set('State', map['state'])
			"
		}
	}

I was able to obtain the merge results for the sample but when i try it on entire data only few rows get updated with the merged columns instead of all the rows.
I am I missing anything here? Any help here is appreciated. TIA

1 Like

In case if anyone is looking for this. I found a workaround!
Used Elasticsearch Filter Plugin as below to merge the two indexes.
Config File

elasticsearch {
		hosts => ["http://localhost:9200"]
		index => "mapping_file"
		query => "ID:%{[ID]}"
		fields => {"COUNTRY" => "COUNTRY"}
}

Pushed the data 1 into ElasticSearch as mapping_file and used it in the ElasticSearch Filter plugin while pushing data 2

Hope it helps!

1 Like