Are bulk index operations serialized?

I'm tracking a synchronization problem at work and wondering if the following is possible. I'm using 5.3.0 .

Assume that I've two bulk requests, both containing a) exactly one and b) the same document (type/id) but different documents payload:

  • time t: bulk request 1 with type1/id1 and payload foo=bar
  • time t+1: bulk request 2 with type1/id1 and payload foo=baz

Expected outcome is that document type1/id1 should have the payload foo=baz

I'm pretty sure that when I sent bulk request 2 that the time advanced already.

Is it possible in case bulk request 1 couldn't be processed immediately and had to be internally buffered/queued (etc.) that bulk request 2 can finish before bulk request 1 ?

I.e. effectively creating the situation where the final document has the content foo=bar instead of foo=baz .

I'm aware of versioning and using external version_type etc. but I want to understand if that is a possible case I'm dealing with here.

thanks!

Basically the thing we guarantee is that if something returned OK then we won't lose it. Anything less is a bug that we'd take very seriously. One important thing to check, though, is that _bulk will return 200 OK even if it partially failed. You have to check the response to be sure.

It is possible for concurrent index requests to race each other on the shard's primary. If two concurrent requests touch the same document then it isn't really clear which one will come second unless the requests use versioning. You don't need external versioning. All it takes is to send the version that you read along and ES will bump the version by one. That'll cause the second request to fail.

Regardless of if you use versioning in your request we use it to keep the primary and replicas in sync. So if you do race two index requests then you can't be sure which one will win, but you can be sure that the same one wins on the primary and the replicas.

Yes, I was checking the bulk return codes but thanks for the reminder!

Can you explain? Are you suggesting every time I index a document a first perform a GET request to get the version number and then write it?

Rarely, but it happens, I've to bulk index literally hundreds of thousands documents and I can imagine this would be inconvenient to perform a GET for every doc.

But: my primary source here is Postgres database and I think due its xmin ghost column it provides me a perfect external version ID. I'll definitely explore this anyway, as what you suggest and what my understanding of this is, that I'm experiencing exactly this race condition.

Thank you!

That is pretty much what the update API does. It does it on the primary which is more efficient than doing it in you application, but it does it.

The usual way when you have an external versioning system like postgres buys you is to use external versioning. That ought to work well with postgresql.

Quoting my initial question, but asking a bit differently: is it technically possible that, if I send two _bulk requests 2 seconds apart (each of them indexing only a single document with same _id but different payload, no version) that it's still possible due whatever influence that the first job will actually finish after the second job (in Elasticsearch, internally) ?

Reason I'm riding on this particular piece of information is that I won't want to make a decision to improve that particular in our application without fully understanding the issue at an.

thanks!

Yes, that is absolutely possible. Elasticsearch is distributed and concurrent. We do not guarantee that requests are executed in the order they are received.

Let's start from the simple case that the requests both hit the node holding the primary shard. When the first request hits the node, it will land on a network thread. That network thread could go to sleep, then the second request could arrive and land on another network thread which will eventually hand the request off to the bulk thread pool which could dutifully execute the request. At this point, the first network thread could wake up and hand off the first request to the bulk thread pool and now execution could proceed.

Or the same thing could happen on the bulk thread pool.

For example, the indexing of the document from the first request could makes it way all the way into the engine, and then be put to sleep and then indexing of the document from the second request could swoop in and steal all the thunder.

And if there's a coordinating node involved, there's even more possibilities of something like this occurring as there's more places for things to get out of order.

The short version: we do not serialize request execution.

Thank you very much for these answers!

I already asked the question quite speculative because in our system, concurrent jobs indexed the same document in different state and the one from the first job was the final version which should have been the one from the second job, actually.

It seems to me the only proper solution here is to rely on a version number generated by the source system and not even rely on the one from ES (because to me, the time window I need to fetch the current version to send it with the _bulk request seems race-condition prune to me too).

regards,

  • Markus

You can use the external versioning feature for this.

You can use optimistic concurrency control for this.

Can you give a small example how this could work in practice? I'm not sure I properly understand this enough to go about an implementation.

Using an external poses additional complexity so I would prefer one without.

thanks

Sure, basically you provide what you think the version is on each request (you can get it beforehand). Then, on the indexing request you specify what you think the current version. If the version in fact matches, Elasticsearch will execute the update. If you the version does not match, Elasticsearch will reject the request. If you have two racing requests, only one of them will win and the other will be rejected. When this happens, if the request should be retried (because it was the most recent update that was rejected), you have to get the version again and send a new request. You have to manage client side whether the loser in a race was a previous versus the most recent update.

$ curl -H "Content-Type: application/json" -XPOST localhost:9200/i/_bulk?pretty=true -d '
{ "create": { "_index": "i", "_type": "t", "_id": "1" } }
{ "f": "v" }
'
{
  "took" : 53,
  "errors" : false,
  "items" : [
    {
      "create" : {
        "_index" : "i",
        "_type" : "t",
        "_id" : "1",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "created" : true,
        "status" : 201
      }
    }
  ]
}
$ curl -H "Content-Type: application/json" -XPOST localhost:9200/i/_bulk?pretty=true -d '
{ "index": { "_index": "i", "_type": "t", "_id": "1", "_version": 1 } }
{ "f": "v" }
'
{
  "took" : 2,
  "errors" : false,
  "items" : [
    {
      "index" : {
        "_index" : "i",
        "_type" : "t",
        "_id" : "1",
        "_version" : 2,
        "result" : "updated",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "created" : false,
        "status" : 200
      }
    }
  ]
}
$ curl -H "Content-Type: application/json" -XPOST localhost:9200/i/_bulk?pretty=true -d '
{ "index": { "_index": "i", "_type": "t", "_id": "1", "_version": 1 } }
{ "f": "v" }
'
{
  "took" : 0,
  "errors" : true,
  "items" : [
    {
      "index" : {
        "_index" : "i",
        "_type" : "t",
        "_id" : "1",
        "status" : 409,
        "error" : {
          "type" : "version_conflict_engine_exception",
          "reason" : "[t][1]: version conflict, current version [2] is different than the one provided [1]",
          "index_uuid" : "wVU_5D_ZQ3a9hZxrtBoeVA",
          "shard" : "3",
          "index" : "i"
        }
      }
    }
  ]
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.