Thanks for the pointers Jorg,
We use Rx Java in our current application, so I'm familiar with
backpressure and ensuring we don't overwhelm target systems. I've been
mulling over the high level design a bit more. A common approach in all
systems that perform multi region replication is the concept of "log
shipping". It's used heavily in SQL systems for replication, as well as in
systems such as Megastore/HBase. This seems like it would be the most
efficient way to ship data from Region A to Region B with a reasonable
amount of latency. I was thinking something like the following.
Admin Operation Replication
This can get messy quickly. I'm thinking I won't have any sort of "merge"
logic since this can get very different for everyone's use case. I was
going to support broadcasting the following operations.
- Index creation
- Index deletion
- Index mapping updates
- Alias index addition
- Alias index removal
This can also get tricky because it makes the assumption of unique index
operations in each region. Our indexes are Time UUID based, so I know we
won't get conflicts. I won't handle the case of an operation being
replayed that conflicts with an existing index, I'll simply log it and drop
it. Handlers could be built in later so users could create their own
resolution logic. Also, this must be replayed in a very strict order. I'm
concerned that adding this additional master/master region communication
could result in more load on the master. This can be solved by running a
dedicated master, but I don't really see any other solution.
Data Replication
Store last sent segments, probably in a system index. Each region could
be offline at different times, so for each segment I'll need to know where
it's been sent.
Monitor segments as they're created. I still need to figure this out a
bit more in the context of latent sending.
Example. Region us-east-1 ES nodes.
We missed sending 5 segments to us-west-1 , and they were merged into 1. I
now only need to send the 1 merged segment to us-west-1, since the other 5
segments will be removed.
However, then a merged segment is created in us-east-1 from 5 segments I've
already sent to us-west-1, I won't want to ship that since it will already
contain the data. As the tree is continually merged, I'll need to somehow
sort out what contains shipped data, and what contains unshipped data.
As a new segment is created perform the following.
3.a) Replay any administrative operations since the last sync on the
index to the target region, so the state is current.
3.b) Push the segment to the target region
The region receives the segment, and adds it to it's current segments.
When a segment merge happens in the receiving region, this will get merged
On Thursday, January 15, 2015 at 5:29:10 PM UTC-7, Jörg Prante wrote:
While it seems quite easy to attach listeners to an ES node to capture
operations in translog-style and push out index/delete operations on shard
level somehow, there will be more to consider for a reliable solution.
The Couchbase developers have added a data replication protocol to their
product which is meant for transporting changes over long distances with
latency for in-memory processing.
To learn about the most important features, see
GitHub - couchbaselabs/dcp-documentation: couchbase unified protocol for replication
Couchbase SDKs
I think bringing such a concept of an inter cluster protocol into ES could
be a good starting point, to sketch the complete path for such an ambitious
project beforehand.
Most challenging could be dealing with back pressure when receiving
nodes/clusters are becoming slow. For a solution to this, reactive Java /
reactive streams look like a viable possibility.
See also
Backpressure · ReactiveX/RxJava Wiki · GitHub
Ratpack - Streams - 2.0.0-rc-1
I'm in favor of Ratpack since it comes with Java 8, Groovy, Google Guava,
and Netty, which has a resemblance to ES.
In ES, for inter cluster communication, there is not much coded afaik,
except snapshot/restore. Maybe snapshot/restore can provide everything you
want, with incremental mode. Lucene will offer numbered segment files for
faster incremental snapshot/restore.
Just my 2¢
On Thu, Jan 15, 2015 at 7:00 PM, Todd Nine < <javascript:>
Hey all,
I would like to create a plugin, and I need a hand. Below are the
requirements I have.
- Our documents are immutable. They are only ever created or
deleted, updates do not apply.
- We want mirrors of our ES cluster in multiple AWS regions. This
way if the WAN between regions is severed for any reason, we do not suffer
an outage, just a delay in consistency.
- As documents are added or removed they are rolled up then shipped
in batch to the other AWS Regions. This can be a fast as a few
milliseconds, or as slow as minutes, and will be user configurable. Note
that a full backup+load is too slow, this is more of a near realtime
- This will sync the following operations.
- Index creation/deletion
- Alias creation/deletion
- Document creation/deletion
What I'm thinking architecturally.
- The plugin is installed on each node in our cluster in all regions
- The plugin will only gather changes for the primary shards on the
local node
- After the timeout elapses, the plugin will ship the changelog to
the other AWS regions, where the plugin will receive it and process it
Are there any api's I can look at that are a good starting point for
developing this? I'd like to do a simple prototype with 2 1 node clusters
reasonably soon. I found several plugin tutorials, but I'm more concerned
with what part of the ES api I can call to receive events, if any.
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to <javascript:>.
To view this discussion on the web visit
For more options, visit
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
To view this discussion on the web visit
For more options, visit