How does replication works in detail?


Assume we have an Elasticsearch cluster with 3 nodes n1, n2 and n3. We have created an index on this cluster with one shard and two replicas with a significant amount of data in (one shard of the index is on each node). Then assume node n1 stops working. After this, new data is added and some old data is deleted from the index. After some time node n1 comes up again and the changes to the index are replicated to the shard on n1.

I would be interested in how the replication is implemented. I assume that not all data but only the changes during the time when node n1 was down were transferred? Is this realized via some kind of oplog? Is there a maximal number of changes which are applied during the downtime of n1 which could be replicated and if this number is exceeded is then a complete shard replicated?

Thank you very much in advance and best regards,
Achim Weigel

Its both better and worse than you think it is. I'm no expert but it looks like when its time to assign a new shard this happens:

  1. The current cluster state master reaches out to all nodes and asks "Do you have any data for this shard, if so what are the files and what are their checksums?"
  2. When all nodes reply the master assigns the shard to the node for which the least data must be send in the next steps.
  3. The node that contains the master instance of that shard starts sending its files to the node that was assigned the shard. During this process the master instance of that shard's translog isn't truncated so it can be replayed in its entirety on the newly assigned node. Also all old files are kept on the old node so they can be copied, even if they are no longer used there.
  4. Once the files on the new node match up with the snapshot of the current master shard the current master sends its translog to the new node for replay. It can also remove the old files that it had to keep.
  5. I'm a bit hazy on this - but somehow step 4 is repeated until there aren't any new transactions to replay. It might require a write lock - I'm not sure.
  6. The new node is now caught up. Normal operation can now be resumed.

This works really well except that the rsync style "make the files the same" can be really slow. This is because the indexing operation is non-deterministic and if you send a pile of writes to 3 different nodes they will all build slightly different indexes out of them. The only thing that makes sure the files line up is this process of moving a shard from node to node that I described above. And even then it replays a translog nondeterministicly.

So in 1.6 there is a feature called sync-ed flush that lets you skip the file copying step. It works by writing a signed paper thin seal on the top of the index if it hasn't been written to in several minutes. If both the shard master and the node receiving the shard have the seal on the index and it has the same signature then you can skip the file replay step and just apply the translog. It should help a ton with rolling restarts in environments where you can either pause writes entirely or where most of the data hasn't changed in a while, like Logstash.

The strategy of file identity really sounds critical for us. We have a quite large data set and initialize the index by sending the data in many parallel threads from different systems to reduce the initial build up time. According to your explanations this should result in quite different index files on the different nodes.

Unfortunately the data set changes all the time so that we also could not use the new sync-ed flush feature.

Ours does as well but we're building the ability to pause writes so we can use the synced flush feature.

Its not how you write the data that matters its how the instances of the shard (primary and replicas) decide to slice up the data into segments and how it merges those segments. And those can be very different on different nodes.

If you can manage it it really works well to split your data along time based lines. With log data its simple - create an index per day (or so) and then write to the new index in the new day. And synced flush automatically kicks in. With other data you have to get more creative. I can't do it at all for my data.

There are some hacks around using the allocation API to force a move to make the files line up. Its basically paying the large copy price up front rather than during a recovery.

There is also the shadow replica feature that works on shared filesystems. I don't know much about it but it could help. I know that you really really really need to have a good shared filesystem. A reliable and fast one.

I suspect there are more tricks that could be played with the translog prevent the file copy step in more situations but I'm no expert there and they are just theoretical in any case. You'd have to implement them.