I am using an indexed script in the output to transform the event
data like this:
input {
kafka {
bootstrap_servers => "kafka.localhost.com:9092"
topics => ["enriched"]
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
ruby {
code => "
id = event.get('[enrichments][id]')
dedup_id = event.get('dedup_id')
event.set('[@metadata][_id]', id)
event.set('[@metadata][dedup_id]', dedup_id)
"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "dedup_store"
document_id => "%{[@metadata][dedup_id]}"
action => "update"
scripted_upsert => true
script_lang => ""
script_type => "indexed"
script => "dedup_upsert_script"
}
}
And here is the indexed script:
curl -X POST "http://localhost:9200/_scripts/dedup_upsert_script" -H 'Content-Type: application/json' -d '
{
"script": {
"lang": "painless",
"source": "if (!ctx._source.containsKey(\"stories\")) { ctx._source.stories = []; } def storyId = params.event['@metadata']['_id']; def createdAt = params.event['created_at']; if (ctx._source.stories.find(s -> s[\"id\"] == storyId) == null) { def newStory = new HashMap(); newStory[\"id\"] = storyId; newStory[\"created_at\"] = createdAt; ctx._source.stories.add(newStory); }"
}
}'
I am getting following error:
:response=>{"update"=>{"_index"=>"dedup_store", "_type"=>"_doc", "_id"=>"B51661CB86F58DF6865EBF00CFC8BDE7_4chan", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"runtime error", "script_stack"=>["storyId = params.event['@metadata']['_id']; def ", " ^---- HERE"], "script"=>"dedup_upsert_script", "lang"=>"painless", "position"=>{"offset"=>97, "start"=>75, "end"=>123}, "caused_by"=>{"type"=>"null_pointer_exception", "reason"=>"cannot access method/field [normalizeIndex] from a null def reference"}