I'm using logstash to index data from Kafka to Elasticsearch. There are multiple events with the same id. I'm using upsert in logstash to update existing documents with the new values based on the _id.
Which leaves the document with the lower priority status "pending".
What I need is some sort of preprocessing in elasticsearch that prevents the update of the document (noop) in case of a lower priority status tries to overwrite a higher priority one.
Is there someway to do preprocessing of data in elasticsearch?
When I tried the exact example mapping in the documentation I got:
{
"error" : {
"root_cause" : [
{
"type" : "mapper_parsing_exception",
"reason" : "No handler for type [version] declared on field [my_version]"
}
],
"type" : "mapper_parsing_exception",
"reason" : "Failed to parse mapping [_doc]: No handler for type [version] declared on field [my_version]",
"caused_by" : {
"type" : "mapper_parsing_exception",
"reason" : "No handler for type [version] declared on field [my_version]"
}
},
"status" : 400
}
If I use the index API, I'll have a missing field: {"id": 5, "status": "processed", "processordID": 12}
Logstash Error:
[2021-01-25T02:57:53,886][ERROR][logstash.outputs.elasticsearch][main][375cd2bcc47962e4aa3a8e9f0373d50720bbe90b85cd9126eee85cd4adaa71ed] Encountered a retryable error. Will Retry with exponential backoff {:code=>400, :url=>"http://127.0.0.1:9200/_bulk", :body=>"{\"error\":{\"root_cause\":[{\"type\":\"illegal_argument_exception\",\"reason\":\"Update requests do not support versioning. Please use `if_seq_no` and `if_primary_term` instead\"}],\"type\":\"illegal_argument_exception\",\"reason\":\"Update requests do not support versioning. Please use `if_seq_no` and `if_primary_term` instead\"},\"status\":400}"}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.