Scripted upsert is failing in Elasticsearch output

I am using an indexed script in the output to transform the event data like this:

input {
    kafka {
        bootstrap_servers => "kafka.localhost.com:9092"
        topics => ["enriched"]
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }

    ruby {
        code => "

            id = event.get('[enrichments][id]')
            dedup_id = event.get('dedup_id')

            event.set('[@metadata][_id]', id)
            event.set('[@metadata][dedup_id]', dedup_id)

        "
    }    
}

output {
    elasticsearch {
        hosts => ["elasticsearch:9200"]
        index => "dedup_store"
        document_id => "%{[@metadata][dedup_id]}"
        action => "update"
        scripted_upsert => true
        script_lang => ""
        script_type => "indexed"
        script => "dedup_upsert_script"
    }
}

And here is the indexed script:

curl -X POST "http://localhost:9200/_scripts/dedup_upsert_script" -H 'Content-Type: application/json' -d '
{
  "script": {
    "lang": "painless",
    
    "source": "if (!ctx._source.containsKey(\"stories\")) { ctx._source.stories = []; } def storyId = params.event['@metadata']['_id']; def createdAt = params.event['created_at']; if (ctx._source.stories.find(s -> s[\"id\"] == storyId) == null) { def newStory = new HashMap(); newStory[\"id\"] = storyId; newStory[\"created_at\"] = createdAt; ctx._source.stories.add(newStory); }"

  }
}'

I am getting following error:

:response=>{"update"=>{"_index"=>"dedup_store", "_type"=>"_doc", "_id"=>"B51661CB86F58DF6865EBF00CFC8BDE7_4chan", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"runtime error", "script_stack"=>["storyId = params.event['@metadata']['_id']; def ", " ^---- HERE"], "script"=>"dedup_upsert_script", "lang"=>"painless", "position"=>{"offset"=>97, "start"=>75, "end"=>123}, "caused_by"=>{"type"=>"null_pointer_exception", "reason"=>"cannot access method/field [normalizeIndex] from a null def reference"}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.