I am writing a logstash job/pipeline which takes input from Elasticsearch index(index 1), in filter plugin reads another elasticSearch index(index2). Both of these idices have json data with array of address objects. I want to merge the addresses from both sources, remove duplicates and save back to elasticSearch index1. However the duplicate removal logic, I am getting error in Ruby code block.
My logstash config:
input {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "index1"
docinfo => true
}
}
filter {
# Read the document from carrier-json index
elasticsearch {
hosts => ["http://localhost:9200"]
index => "index2"
query => "_id:%{[uniq_id]}"
fields => { "addresses" => "index2_addresses" }
}
# Merge addresses and remove duplicates
if [index2_addresses] {
ruby {
code => '
existing_addresses = event.get("addresses") || []
new_addresses = event.get("index2_addresses") || []
merged_addresses = (existing_addresses + new_addresses)
merged_addresses.uniq { |addr| addr["address"] + addr["address_2"] + addr["city"] + addr["state"] + addr["zip"] }
event.set("addresses", merged_addresses)
'
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["http://localhost:9200"]
index => "index1"
document_id => "%{[@metadata][_id]}"
action => "update"
doc_as_upsert => true
}
}
json data in index1
{
"id": "id1",
"addresses": [
{"address": "a11","address_2": "a21","city": "c1","state": "s1","zip": "12345","phone": "9191919191"},
],
"field1": "f1", "field2": "f2", "field3": "f3"
}
json data in index2
{
"id": "a1",
"addresses": [
{"address": "a11","address_2": "a21","city": "c1","state": "s1","zip": "12345","phone": "9191919191"},
{"address": "a21","address_2": "a22","city": "c2","state": "s2","zip": "12346","phone": "9191919192"},
],
"field5": [{"f51", "f52",}], "field6": "f6"
}
exception:
2024-07-21T19:14:00,528][ERROR][logstash.filters.ruby ][main][2b9cf7fde68fdb86bdd62c0c5fff77ac30a16c61c3c0af8b95a8a775dc5123f6] Ruby exception occurred: undefined method `+' for nil:NilClass {:class=>"NoMethodError", :backtrace=>["(ruby filter code):6:in `block in filter_method'", "org/jruby/RubyArray.java:3464:in `uniq'", "(ruby filter code):6:in `block in filter_method'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.8/lib/logstash/filters/ruby.rb:96:in `inline_script'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-ruby-3.1.8/lib/logstash/filters/ruby.rb:89:in `filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159:in `do_filter'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:178:in `block in multi_filter'", "org/jruby/RubyArray.java:1821:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:175:in `multi_filter'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:134:in `multi_filter'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:300:in `block in start_workers'"]}