Conditional update to the document

Hello,

If I wanted to update an existing document completely (not partially), if it satisfies the condition. See an example below:

Index: Twitter
Type: tweet

  1. Add tweet and lastupdated fields to document 1 as below. Please note that the lastupdated field is of type date and has epoch value.

    PUT twitter/tweet/1
    {
    "tweet" : "tweet1",
    "lastupdated" : 1478218039000
    }

  2. I want to update the document only if the epoch time is greater than the current existing time (which is 1478218039000). I tried using the script, but I get an error that doc and script cannot be used together (see error below). I also tried upsert with a script, but no luck. See a couple of queries that I tried, but in vain. Can someone please share if this is possible in ElasticSearch?

Option 1 with script upsert that didn't work. ES returns success, but does not update the doc. It looks like it only either executes script or upsert.

POST /twitter/tweet/1/_update
{
  "scripted_upsert":true,
  "script": {
    "lang": "painless",
    "inline" : "if (ctx._source.lastupdated > params.currdate) { ctx.op = 'noop'}",
    "params": {
      "currdate": 1478219029000
    }
  },
  "upsert" : {
    "tweet" : "tweet2",
    "lastupdated" : 1478219029000
  }
}

Option 2 with script and doc that didn't work as well

POST /twitter/tweet/1/_update
{
  "script": {
    "lang": "painless",
    "inline" : "if (ctx._source.lastupdated > params.currdate) { ctx.op = 'noop'}",
    "params": {
      "currdate": 1478219029000
    }
  },
  "doc" : {
    "tweet" : "tweet2",
    "lastupdated" : 1478219029000
  }
}

Error:

{
  "error": {
    "root_cause": [
      {
        "type": "action_request_validation_exception",
        "reason": "Validation Failed: 1: can't provide both script and doc;"
      }
    ],
    "type": "action_request_validation_exception",
    "reason": "Validation Failed: 1: can't provide both script and doc;"
  },
  "status": 400
}

A lot of other folks seem to have requested this information, but I can't seem to find an answer.
See github and stackoverflow for couple of requests. Thank you for your help in advance.

The execution choice is it either applies the doc or relies on a script.
I think you're assuming you can use a bit of both (the script to decide if noop is set and the doc to patch the new data). If you opt for scripted_upsert:true your script has to do all the work e.g.

POST /twitter/tweet/1/_update
{
  "scripted_upsert":true,
  "script": {
	"lang": "painless",
	"inline" : "if (ctx._source.lastupdated > params.currdate) { ctx.op = 'noop'} ctx._source.tweet=params.tweet",
	"params": {
	  "currdate": 1478219029000,
	  "tweet":"tweet2"
	}
  }
}
2 Likes

Mark,

Thank you for the clarification and the example. I should have mentioned that I tried this earlier. While this would work, the problem with this approach is that the script gets uglier as the number of fields grows (think if 100s). Our use case is that we want to ingest data concurrently and sometimes it is possible that the same document can be updated multiple times and can arrive out of order to elasticsearch. We want to update the document only if it is newer by checking the timestamp like it is shown in the example above. Also, I was planning to do this via Java APIs.

  1. Is there a cleaner way of doing this? (thinking of Java/Scala APIs)
  2. Under the hood, is this an atomic operation? Are there two round trip calls or just one?
1 Like

I hear you on the complex scripts.

Perhaps you could instead rely on the elasticsearch version checking? Ordinarily it increments the version numbers for you but perhaps you could supply them instead as timestamps?

I am guessing you meant the version number ("_version": ).

  1. Is there a way to supply our own version numbers?
  2. Can you check existing version number and update the document if the incoming document's timestamp/version is greater than existing one in one call? In other words, can it lead to race condition in case of concurrent requests?

An example would be very useful here. Thank you.

1 Like

I've not used the feature myself but here's a good place to start: https://www.elastic.co/blog/elasticsearch-versioning-support

Hi,

Have you found a solution for this scenario? We are facing the same challenge of performing conditional updates on documents (also based on a timestamp or a date field), and it doesn't look like there is any viable solution for this at the moment.

Hi Dan,

Try external versioning. Basically, you use timestamp (epoch) as your version for the document. If the timestamp is lower than existing document, the request will be rejected.

I might be able to post an example later on, if you still need help.

2 Likes

Thanks, I have looked into it but the update API does not support external versioning (it is explicitly mentioned in the documentation). So unless I am missing something, it looks like an obvious feature such as "update the document if " is simply missing from ElasticSearch.

Do you need to use update API? Or can you simply consider it as insert with new version?

I do, because I am dealing with partial updates.

Consider the following scenario, where I have an indexed document that looks like this:

{
"_id":9999999
"name":"John",
"Address":"Some Address",
"Phone":12345568,
"UpdateDate":2016-01-01T10:00:00.000
}

And when John's phone number is updated, I will get an incremental update that looks like this (let's assume _id is John's social security number, and it is unique):

{
"_id":9999999
"Phone":12341234
"UpdateDate":2016-09-01T12:00:00.000
}

So, using the update API I simply update the UpdateDate and Phone of John's record, and that's it.

But what if I get a record with an older UpdateDate than what I already have?

{
"_id":9999999
"Phone":12344321
"UpdateDate":2015-03-06T13:00:00.000
}

In this case I don't want to do anything, since this is obviously an out of date record, which I want to discard.
I was really hoping it would be possible to use a script like "if (ctx._source.UpdateDate > UpdateDate) { ctx.op = 'none'} - otherwise use the new partial doc. But if a document already exists, Elastic will ignore the attached document and always run the script - which means I need to encapsulate the entire update logic inside the script, which means the script itself has to be programatically generated based on the document I am trying to update, i.e.:

if (ctx._source.UpdateDate > '2015-03-06T13:00:00.000') { ctx.op = 'none' } else { ctx._source.Phone = 12341234}

2 Likes

This is how I started my journey with updates.

I think one of the best options I know of is to perform a complete update with new data using external versioning instead of partial update. This way, it is supported by ES and you don't have to deal with additional logic / locking in your code.

Hi, Were you able to solve the issue? i have a similar problem and I am looking for solution for conditional upserts.

Thanks!

The only working solution so far is to auto-generate the update script..

Thanks for the reply Dan. If possible can you please send me the syntax for update scripts example.

It's something along the lines of the following:

filter {
  ruby {
    code => 'field_name = "update_script"
             condCol = event.get("condCol")
             arr = []
             script = "if (ctx._source.#{condCol} > params.event.get(\"#{condCol}\")) { ctx.op = \"none\" } else {<update>}"
             event.to_hash.each do |key,value|
              next if key.start_with?("@")
              arr.push("ctx._source.#{key} = params.event.get(\"#{key}\")")
             end
             updates = arr.join(";")
             script.sub!("<update>",updates)
             event.set(field_name,script)'
    }
}

And then in the elasticsearch output plugin:

script => "%{update_script}"

This assumes there is a "condCol" field in the input that describes which field should be used for the condition.
Also, this assumes the data is flat (or that you don't care about nesting) - there is no deep merge going on here, top level values get fully overwritten.

3 Likes

Have exactly the same issue, did anyone find some easy way to work around this, as I can't base it on versions.

A clean way to do this without enumerating the entire map would be wrapping the entire map inside a wrapper object (e.g. called "document") then replace the original with ctx._source = params.document. A full example:

POST /name/type/1/_update
{
   "scripted_upsert": true,
   "script" : {
     "inline": "if (ctx._source.timestamp >= params.timestamp) { ctx.op = 'none' } else { ctx._source = params.document }", 
     "lang": "painless",
     "params": {
        "document": {
          "timestamp": 112255
        }
     }
   },
   "upsert": {
      "timestamp": 33333
   }
}

Hope this helps

2 Likes

Even I'm looking for a similar solution
Did you find any ?

Hi all, I'm looking for the solution to this problem too and I think I got it.

Background information of my situation:
I have one search entity in Elastic where the document is compiled from multiple microservices. For example, the PropertyOwner object has some properties in microservice A, some addresses in microservice B. The goal is to search for PropertyOwner based on any of those information: property name or address.

Each microservice handles its own way of indexing a partial document of PropertyOwner, the only thing they have to agree on is the document id, which is the UUID of PropertyOwner. Then the fields of the documents will not overlap on each other. The final document looks like this:
{
id: "Owner 12345",
serviceA_property_names: ["Property 1", "Property 2"],
serviceA_timestamp: 123456,
serviceB_addresses: ["123 Golden Dr."]
serviceB_timestamp: 123789
}

Now if 2 simultaneous user changes the address of owners in service B. We want to update the address in the document, only if the serviceB_timestamp is lower than that of the new request.

Here's how I do it:

POST /name/type/Owner+12345/_update
{
  "scripted_upsert": true,
  "script": {
    "lang": "painless",
    "inline": "if (ctx._source.serviceB_timestamp < params.doc.serviceB_timestamp) { ctx._source.putAll(params.doc) }",
    "params": {
      "doc": {
        "serviceB_timestamp": 124000,
        "serviceB_addresses": ["567 Silver Rd."]
      }
    }
  }
}

The method _source.putAll is from Java 11 documentation: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/Map.html#putAll(java.util.Map)

5 Likes