Write consistency on a single node cluster

As referenced here:

I'm currently attempting to migrate our elasticsearch data to being 2.0 compatible (ie: no dots in field names) in preparation for an upgrade form 1.x to 2.x.

I've written a program that runs through the data (in batches) that sits in a one-node cluster, and renames the fields, re-indexing the documents using the Bulk API.

At some point it all goes wrong, and the total number of documents coming back from my query (to be "ugpraded") doesn't change, even though it should be counting down.

Initially I thought that it wasn't working. When I pick a document and query for it to see if it's changing, I can see that it is working.

However, when I query documents for a specific field within that document I get two results with the same ID. One of the results has the upgraded field, the other one does not - so the previous query must be selecting the one with the highest version?

On further inspection I can see that they come from different shards:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 19.059433,
    "hits" : [ {
      "_shard" : 0,
      "_node" : "FxbpjCyQRzKfA9QvBbSsmA",
      "_index" : "status",
      "_type" : "status",
      "_id" : "http://static.photosite.com/80018335.jpg",
      "_version" : 2,
      "_score" : 19.059433,
      "_source":{"url":"http://static.photosite.com/80018335.jpg","metadata":{"url.path":["http://www.photosite.com/80018335"],"source":["http://www.photosite.com/80018335"],"longitude":["104.507755"],"latitude":["21.601669"]}},
      ...
    }, {
      "_shard" : 3,
      "_node" : "FxbpjCyQRzKfA9QvBbSsmA",
      "_index" : "status",
      "_type" : "status",
      "_id" : "http://static.photosite.com/80018335.jpg",
      "_version" : 27,
      "_score" : 17.607681,
      "_source":{"url":"http://static.photosite.com/80018335.jpg","metadata":{"url_path":["http://www.photosite.com/80018335"],"source":["http://www.photosite.com/80018335"],"longitude":["104.507755"],"latitude":["21.601669"]}},
      ...      
  }
}

Why is elasticsearch allowing this? How can I prevent this? What I need is for the re-index to ensure that the data is overwritten. Any help (or alternative methods of upgrading the data) is appreciated.

elasticsearch version: 1.7
no. of nodes: 1
no of shards total: 17
shards holding my index: 5

Are you by chance using a custom routing?

I'm not, no :frowning:

Which version are you reindexing into exactly?

Can you share more details about how you create the index on the new cluster, and how you are reindexing data?

Something seems to be going wrong around routing of the documents. Elasticsearch doesn't enforce uniqueness of ids across different shards. The assumption is that whenever you index a document with same id, it will get routed to the same shard and replace the existing one, unless some custom routing is specified, or.... well let's why this happens to you.

Cheers
Luca

Sure! Happy to share anything that helps.

I'm reading from and writing into the same elasticsearch, obviously - version 1.7.3.

I'm reading using the following query:

{
  "bool" : {
    "must" : {
      "wildcard" : {
        "metadata.url.path" : "*"
      }
    },
    "must_not" : {
      "wildcard" : {
        "metadata.url_path" : "*"
      }
    }
  }
}

The code I'm using to write the data back:

BulkRequestBuilder bulkRequest = destinationConnection.getClient().prepareBulk();
for(Map<String, Object> doc : batch.getDocs()){
    XContentBuilder builder;
    try {
        builder = XContentFactory.jsonBuilder().startObject();
        for(Map.Entry<String, Object> mapEntry : doc.entrySet()){
            if(!mapEntry.getKey().equals("id")){
                builder.field(mapEntry.getKey(), mapEntry.getValue());
            }
        }
        builder.endObject();
    } catch (IOException e) {
        throw new DocumentBuilderException("Error building request to move items to new parent!", e);
    }

    bulkRequest.add(destinationConnection.getClient().prepareIndex(destinationIndex, destinationType, (String) doc.get("id")).setSource(builder).request());

}
// Tried with and without setRefresh - same results
BulkResponse response = bulkRequest.setRefresh(true).execute().actionGet();
for(BulkItemResponse itemResponse : response.getItems()){
    if(itemResponse.isFailed()){
        LOG.error("Updating item: {} failed: {}", itemResponse.getFailure().getId(), itemResponse.getFailureMessage());
    }
}

If I use a BulkProcessor with a listener instead, here is an example of what I get before and after:

Before:
Item( id=http://static.photosite.com/20160123_093502.jpg, index=status, type=status, op_type=INDEX, version=-3, parent=null, routing=null )

After:
Item( id=http://static.photosite.com/20160123_093502.jpg, index=status, type=status, op_type=index, version=22)

Note the negative version number before hand, and the big jump in version afterwards. Of course, it's not actually doing a big jump in version, but it's writing over a different document than it's read from, and because I've run the program several times the version has just gone up and up.

I'm not sure how one document can have a version of -3, especially as docs state version is always a positive number, and this is from a snapshot too.

There is no custom versioning, no external versioning. The index mapping was originally created like this:

{
  "status": {
    "dynamic_templates": [
      {
        "metadata": {
          "path_match": "*",
          "match_mapping_type": "string",
          "mapping": {
            "type": "string",
            "index": "not_analyzed"
          }
        }
      }
    ],
    "_source": {
      "enabled": true
    },
    "_all": {
      "enabled": false
    },
    "_id": {
      "path": "url"
    },
    "properties": {
      "nextFetchDate": {
        "type": "date",
        "format": "dateOptionalTime"
      },
      "status": {
        "type": "string",
        "index": "not_analyzed",
        "store": true
      },
      "url": {
        "type": "string",
        "index": "not_analyzed",
        "store": true
      }
    }
  }
}

Is there anything else I can provide that might help?

Other things I've tried since are:

  • Setting replication to 0 on that index.
  • Running it against a 2-node cluster with green health status

Everything behaves in the same way so far.

Even though documents are being processed in batches of 5000, the results from the query don't go down by 5000 each time. In fact, it starts by going down by a few hundred and then that amount gradually decreases until the total result count from the query stays the same every time a batch is processed:

 10:43:42.220  INFO : Fetching another batch
10:43:51.701  INFO : Found 9260992 matching documents. Processing 5000...
10:43:51.794  INFO : Total remaining: 9260992
10:43:51.813  INFO : Writing batch of 5000 items
10:43:57.261  INFO : Fetching another batch
10:44:06.136  INFO : Found 9258661 matching documents. Processing 5000...
10:44:06.154  INFO : Total remaining: 9258661
10:44:06.158  INFO : Writing batch of 5000 items
10:44:11.369  INFO : Fetching another batch
10:44:19.790  INFO : Found 9256813 matching documents. Processing 5000...
10:44:19.804  INFO : Total remaining: 9256813
10:44:19.807  INFO : Writing batch of 5000 items
10:44:22.684  INFO : Fetching another batch
10:44:31.182  INFO : Found 9255697 matching documents. Processing 5000...
10:44:31.193  INFO : Total remaining: 9255697
10:44:31.196  INFO : Writing batch of 5000 items
10:44:33.852  INFO : Fetching another batch
10:44:42.394  INFO : Found 9255115 matching documents. Processing 5000...
10:44:42.406  INFO : Total remaining: 9255115
10:44:42.409  INFO : Writing batch of 5000 items
10:44:45.152  INFO : Fetching another batch
10:44:51.473  INFO : Found 9254744 matching documents. Processing 5000...
10:44:51.483  INFO : Total remaining: 9254744
10:44:51.486  INFO : Writing batch of 5000 items
10:44:53.853  INFO : Fetching another batch
10:44:59.966  INFO : Found 9254551 matching documents. Processing 5000...
10:44:59.978  INFO : Total remaining: 9254551
10:44:59.981  INFO : Writing batch of 5000 items
10:45:02.446  INFO : Fetching another batch
10:45:07.773  INFO : Found 9254445 matching documents. Processing 5000...
10:45:07.787  INFO : Total remaining: 9254445
10:45:07.791  INFO : Writing batch of 5000 items
10:45:10.237  INFO : Fetching another batch
10:45:15.679  INFO : Found 9254384 matching documents. Processing 5000...
10:45:15.703  INFO : Total remaining: 9254384
10:45:15.712  INFO : Writing batch of 5000 items
10:45:18.078  INFO : Fetching another batch
10:45:23.660  INFO : Found 9254359 matching documents. Processing 5000...
10:45:23.712  INFO : Total remaining: 9254359
10:45:23.725  INFO : Writing batch of 5000 items
10:45:26.520  INFO : Fetching another batch
10:45:31.895  INFO : Found 9254343 matching documents. Processing 5000...
10:45:31.905  INFO : Total remaining: 9254343
10:45:31.908  INFO : Writing batch of 5000 items
10:45:34.279  INFO : Fetching another batch
10:45:40.121  INFO : Found 9254333 matching documents. Processing 5000...
10:45:40.136  INFO : Total remaining: 9254333
10:45:40.139  INFO : Writing batch of 5000 items
10:45:42.381  INFO : Fetching another batch
10:45:47.798  INFO : Found 9254325 matching documents. Processing 5000...
10:45:47.823  INFO : Total remaining: 9254325

I've also tried increasing the pause time between each batch to 5 seconds. It doesn't make much of a difference.

I'm getting quite desperate at this point! I really need to find a viable way to upgrade our data to be 2.x compliant. I don't want to go the knapsack route, because we have a LOT of data. The program we've written can be started and stopped, or just run in the background.

Something else to note is that it doesn't run on a scroll query because historically I've had memory issues with that (the program gradually slows and slow and elasticsearch stops responding with NoNodeAvailableExceptions).

Any help/suggestions are warmly welcomed!

Wondering if this is somehow screwing it all up. Can anyone answer why it would have a negative version number?

What is your concern exactly? Afraid of large archive files?

This smells like a bug... but in your code

 bulkRequest.add(destinationConnection.getClient().prepareIndex(destinationIndex, destinationType, (String) doc.get("id")).setSource(builder).request());

you do not use setVersion() at all - so how is it possible your two docs can carry a version 2 and 27 respectively? Is it possible your index or your cluster is in an inconsistent state?

This makes me suspicious. Is parent/child in play on either the source or target documents?

Thanks for having a look, guys.

I don't want to go the knapsack route, because we have a LOT of data.

What is your concern exactly? Afraid of large archive files?

I'm dealing with a LOT of data. Not piddly log files. A serious amount of data on our production system. The sheer operation of outputting that amount of data to file, processing those files, and putting it all back in again? I'd rather get it working using queries.

you do not use setVersion() at all - so how is it possible your two docs can carry a version 2 and 27 respectively? Is it possible your index or your cluster is in an inconsistent state?

You don't have to set a version when indexing/overwriting. elasticsearch does that for you. The fact that the second one is at 22 already is red herring - I've run the program lots of times, every time it reads from the first document, and every time it writes to the other one, which is why it's gradually reached 22.

No bug in my code as far as I can see. We've used this code before and it's worked fine. This behaviour is new, which is making me think that it's related to the snapshot I restored.

throw new DocumentBuilderException("Error building request to move items to new parent!", e);

This makes me suspicious. Is parent/child in play on either the source or target documents?

That exception catch is bolt-and-braces. It never gets thrown in reality, but IOException, which is thrown by the XContentBuilder, isn't a RuntimeException and therefore I have to put in a case to manage it.

Any other ideas?

This was simply because I was not putting setVersion(true) on my query call. D'oh!

Executive Summary:

I am an idiot.

Details:

I started today by learning how elasticsearch routes documents to shards.

It turns out that it uses the following forumula:

shard = hash(routing) % number_of_primary_shards

By default, routing is the _id of the document, unless you override that when indexing.

Everyone was mentioning that I was doing routing, but I was adamant that I was not. And that was the problem!!!

I had restored a snapshot of data. The data in the index I was attempting to upgrade was originally written by a program called stormcrawler.

stormcrawler does use routing to index these documents, but because I wasn't using routing to re-index them, it was creating apparent duplicates on different shards.

Once again, elasticsearch rules and I suck.

Sorry for everyone whose time I wasted on this. I'm now going to lie down in a dark room and cry.

Don't be ridiculous.

You're killin' me.

Glad you got it sorted out.

You're exaggerating.

Elasticsearch's fault is that it does not reject un-routed index actions on an obviously routed index. The reason is there is no concept of a routed index, the cluster state is flying blind, which is more than unfortunate.

An improved situation could be:

  • create index with a specified property e.g. custom_doc_route: true
  • all document creation actions for that index must have a document route attribute set
  • if no route is set, ELasticsearch should reject the action instead of blindly indexing the document

@jprante - For sure. A minimum-spec dream-scenario would be if elasticsearch could tell you how the document was routed as part of its details. ie:

{
    _id: "blah",
    _version: "1"
    _routing: "field you routed by"
    ...
}

But I suppose the comeback on that is that there's nothing stopping you from indexing such a field yourself.