Conditional update based on priority in Elasticsearch


I'm using logstash to index data from Kafka to Elasticsearch. There are multiple events with the same id. I'm using upsert in logstash to update existing documents with the new values based on the _id.

sample input (in order):

{"id": 5, "status": "pending", "customerID": 7}
{"id": 5, "status": "processed", "processordID": 12}

doc in elasticsearch:
{"id": 5, "status": "processed", "customerID": 7, "processordID": 12}

We have some anomalies where the most recent status would have a lower priority than the original doc.

example (in order):

{"id": 5, "status": "processed"}
{"id": 5, "status": "pending"}

Which leaves the document with the lower priority status "pending".

What I need is some sort of preprocessing in elasticsearch that prevents the update of the document (noop) in case of a lower priority status tries to overwrite a higher priority one.

Is there someway to do preprocessing of data in elasticsearch?

Logstash configuration:

input {
 kafka {
		bootstrap_servers => "kafka:9092"
    topics => "events"
    group_id => "events"
    client_id => "events"
    decorate_events => true
... some filters
output {
	elasticsearch {
		index => "events"
		hosts => ""
		document_id => "%{id}"
		action => "update"
		doc_as_upsert => true

You could set explicit versions in your documents matching the priority you define -

That way the higher priority won't be overwritten with a lower one.

I don't have this datatype available in my installation. I am using the OSS version of elasticsearch.

Is there any other way to achieve a similar behavior without using the version datatype?

You do, it's a core part of Elasticsearch.

When I tried the exact example mapping in the documentation I got:

  "error" : {
    "root_cause" : [
        "type" : "mapper_parsing_exception",
        "reason" : "No handler for type [version] declared on field [my_version]"
    "type" : "mapper_parsing_exception",
    "reason" : "Failed to parse mapping [_doc]: No handler for type [version] declared on field [my_version]",
    "caused_by" : {
      "type" : "mapper_parsing_exception",
      "reason" : "No handler for type [version] declared on field [my_version]"
  "status" : 400

I also found it part of the Basic subscription here:
Elastic Subscriptions

So, I deduced that I don't have it.

Apologies, I linked to the wrong doc page.
See and the reference to version there.

Thanks for the suggestion.

I've tried using the versioning. Unfortunately, it doesn't work with the update API using Logstash.

I have to use the update API since I have incremental data.

{"id": 5, "status": "pending", "customerID": 7}
{"id": 5, "status": "processed", "processordID": 12}

doc in elasticsearch:
{"id": 5, "status": "processed", "customerID": 7, "processordID": 12}

If I use the index API, I'll have a missing field:
{"id": 5, "status": "processed", "processordID": 12}

Logstash Error:

[2021-01-25T02:57:53,886][ERROR][logstash.outputs.elasticsearch][main][375cd2bcc47962e4aa3a8e9f0373d50720bbe90b85cd9126eee85cd4adaa71ed] Encountered a retryable error. Will Retry with exponential backoff  {:code=>400, :url=>"", :body=>"{\"error\":{\"root_cause\":[{\"type\":\"illegal_argument_exception\",\"reason\":\"Update requests do not support versioning. Please use `if_seq_no` and `if_primary_term` instead\"}],\"type\":\"illegal_argument_exception\",\"reason\":\"Update requests do not support versioning. Please use `if_seq_no` and `if_primary_term` instead\"},\"status\":400}"}

Did you try that?

I can't see any settings for these params in the Logstash Output Plugin:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.