Doc_as_upsert along with script in Elasticsearch output plugin

Hello,

I'm using logstash to index data from Kafka to Elasticsearch. There are multiple events with the same id. I'm using doc_as_upsert in logstash to update existing documents with the new values based on the _id.

sample input (in order):

{ "id":5, "status":"new", "time":5, "field1":1}
{ "id":5, "status":"pending", "time":7, "field2":1}
{ "id":5, "status":"processed", "time":9, "field3":1}

doc in elasticsearch:
{"id": 5, "status": "processed", "time": 9, "field1": 1, "field2":1, "field3":1}

We have some anomalies where the most recent status would have a lower priority than the original doc.

example (in order):

{"id": 5, "status": "processed", ...}
{"id": 5, "status": "pending", ...}

Which leaves the document with the lower priority status "pending".

So, I've written a painless script in logstash to prevent lower priority values in status from overwriting higher priority ones.

script => 'if(ctx._source.status == "processed" && (params.event.get("status") == "pending" || params.event.get("status") == "new")) ctx.op="none"'

The upsert was working well before adding the script setting. After adding the script, that broke the update action. The first doc is indexed, but no updates occurs after that.

Is that a problem with my painless script, or is it a bug?

Thank you

Logstash config:

input {                                                                                                                                                                                                              
    file {                                                                                                                                                                                                               
        type => "json"                                                                                                                                                                                                     
        path => "/usr/share/logstash/test.json"                                                                                                                                                                            
        start_position => "beginning"                                                                                                                                                                                      
        sincedb_path => "/usr/share/logstash/filedb"                                                                                                                                                                     
    }                                                                                                                                                                                                                
 } 
filter { json { source => "message" } }
output {
        elasticsearch {
                index => "test_index"
                hosts => "http://127.0.0.1:9200"
                document_id => "%{id}"
                action => "update"
                doc_as_upsert => true
                script => 'if(ctx._source.status == "processed" && (params.event.get("status") == "pending" || params.event.get("status") == "new")) ctx.op="none"'
        }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.