Help creating a near real time streaming plugin to perform replication between clusters

Hey all,
I would like to create a plugin, and I need a hand. Below are the
requirements I have.

  • Our documents are immutable. They are only ever created or deleted,
    updates do not apply.
  • We want mirrors of our ES cluster in multiple AWS regions. This way
    if the WAN between regions is severed for any reason, we do not suffer an
    outage, just a delay in consistency.
  • As documents are added or removed they are rolled up then shipped in
    batch to the other AWS Regions. This can be a fast as a few milliseconds,
    or as slow as minutes, and will be user configurable. Note that a full
    backup+load is too slow, this is more of a near realtime operation.
  • This will sync the following operations.
    • Index creation/deletion
    • Alias creation/deletion
    • Document creation/deletion

What I'm thinking architecturally.

  • The plugin is installed on each node in our cluster in all regions
  • The plugin will only gather changes for the primary shards on the
    local node
  • After the timeout elapses, the plugin will ship the changelog to the
    other AWS regions, where the plugin will receive it and process it

Are there any api's I can look at that are a good starting point for
developing this? I'd like to do a simple prototype with 2 1 node clusters
reasonably soon. I found several plugin tutorials, but I'm more concerned
with what part of the ES api I can call to receive events, if any.

Thanks,
Todd

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

While it seems quite easy to attach listeners to an ES node to capture
operations in translog-style and push out index/delete operations on shard
level somehow, there will be more to consider for a reliable solution.

The Couchbase developers have added a data replication protocol to their
product which is meant for transporting changes over long distances with
latency for in-memory processing.

To learn about the most important features, see

and

http://docs.couchbase.com/admin/admin/Concepts/dcp.html

I think bringing such a concept of an inter cluster protocol into ES could
be a good starting point, to sketch the complete path for such an ambitious
project beforehand.

Most challenging could be dealing with back pressure when receiving
nodes/clusters are becoming slow. For a solution to this, reactive Java /
reactive streams look like a viable possibility.

See also

http://www.ratpack.io/manual/current/streams.html

I'm in favor of Ratpack since it comes with Java 8, Groovy, Google Guava,
and Netty, which has a resemblance to ES.

In ES, for inter cluster communication, there is not much coded afaik,
except snapshot/restore. Maybe snapshot/restore can provide everything you
want, with incremental mode. Lucene will offer numbered segment files for
faster incremental snapshot/restore.

Just my 2¢

Jörg

On Thu, Jan 15, 2015 at 7:00 PM, Todd Nine tnine@apigee.com wrote:

Hey all,
I would like to create a plugin, and I need a hand. Below are the
requirements I have.

  • Our documents are immutable. They are only ever created or deleted,
    updates do not apply.
  • We want mirrors of our ES cluster in multiple AWS regions. This way
    if the WAN between regions is severed for any reason, we do not suffer an
    outage, just a delay in consistency.
  • As documents are added or removed they are rolled up then shipped in
    batch to the other AWS Regions. This can be a fast as a few milliseconds,
    or as slow as minutes, and will be user configurable. Note that a full
    backup+load is too slow, this is more of a near realtime operation.
  • This will sync the following operations.
    • Index creation/deletion
    • Alias creation/deletion
    • Document creation/deletion

What I'm thinking architecturally.

  • The plugin is installed on each node in our cluster in all regions
  • The plugin will only gather changes for the primary shards on the
    local node
  • After the timeout elapses, the plugin will ship the changelog to the
    other AWS regions, where the plugin will receive it and process it

Are there any api's I can look at that are a good starting point for
developing this? I'd like to do a simple prototype with 2 1 node clusters
reasonably soon. I found several plugin tutorials, but I'm more concerned
with what part of the ES api I can call to receive events, if any.

Thanks,
Todd

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoFxWfx_KasNcZVCA7wC6VTSM-NrC0hBn51iSnikGsdD8g%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Thanks for the pointers Jorg,
We use Rx Java in our current application, so I'm familiar with
backpressure and ensuring we don't overwhelm target systems. I've been
mulling over the high level design a bit more. A common approach in all
systems that perform multi region replication is the concept of "log
shipping". It's used heavily in SQL systems for replication, as well as in
systems such as Megastore/HBase. This seems like it would be the most
efficient way to ship data from Region A to Region B with a reasonable
amount of latency. I was thinking something like the following.

Admin Operation Replication

This can get messy quickly. I'm thinking I won't have any sort of "merge"
logic since this can get very different for everyone's use case. I was
going to support broadcasting the following operations.

  • Index creation
  • Index deletion
  • Index mapping updates
  • Alias index addition
  • Alias index removal

This can also get tricky because it makes the assumption of unique index
operations in each region. Our indexes are Time UUID based, so I know we
won't get conflicts. I won't handle the case of an operation being
replayed that conflicts with an existing index, I'll simply log it and drop
it. Handlers could be built in later so users could create their own
resolution logic. Also, this must be replayed in a very strict order. I'm
concerned that adding this additional master/master region communication
could result in more load on the master. This can be solved by running a
dedicated master, but I don't really see any other solution.

Data Replication

  1. Store last sent segments, probably in a system index. Each region could
    be offline at different times, so for each segment I'll need to know where
    it's been sent.

  2. Monitor segments as they're created. I still need to figure this out a
    bit more in the context of latent sending.

Example. Region us-east-1 ES nodes.

We missed sending 5 segments to us-west-1 , and they were merged into 1. I
now only need to send the 1 merged segment to us-west-1, since the other 5
segments will be removed.

However, then a merged segment is created in us-east-1 from 5 segments I've
already sent to us-west-1, I won't want to ship that since it will already
contain the data. As the tree is continually merged, I'll need to somehow
sort out what contains shipped data, and what contains unshipped data.

  1. As a new segment is created perform the following.
    3.a) Replay any administrative operations since the last sync on the
    index to the target region, so the state is current.
    3.b) Push the segment to the target region

  2. The region receives the segment, and adds it to it's current segments.
    When a segment merge happens in the receiving region, this will get merged
    in.

Thoughts?

On Thursday, January 15, 2015 at 5:29:10 PM UTC-7, Jörg Prante wrote:

While it seems quite easy to attach listeners to an ES node to capture
operations in translog-style and push out index/delete operations on shard
level somehow, there will be more to consider for a reliable solution.

The Couchbase developers have added a data replication protocol to their
product which is meant for transporting changes over long distances with
latency for in-memory processing.

To learn about the most important features, see

https://github.com/couchbaselabs/dcp-documentation

and

http://docs.couchbase.com/admin/admin/Concepts/dcp.html

I think bringing such a concept of an inter cluster protocol into ES could
be a good starting point, to sketch the complete path for such an ambitious
project beforehand.

Most challenging could be dealing with back pressure when receiving
nodes/clusters are becoming slow. For a solution to this, reactive Java /
reactive streams look like a viable possibility.

See also

https://github.com/ReactiveX/RxJava/wiki/Backpressure

http://www.ratpack.io/manual/current/streams.html

I'm in favor of Ratpack since it comes with Java 8, Groovy, Google Guava,
and Netty, which has a resemblance to ES.

In ES, for inter cluster communication, there is not much coded afaik,
except snapshot/restore. Maybe snapshot/restore can provide everything you
want, with incremental mode. Lucene will offer numbered segment files for
faster incremental snapshot/restore.

Just my 2¢

Jörg

On Thu, Jan 15, 2015 at 7:00 PM, Todd Nine <tn...@apigee.com <javascript:>

wrote:

Hey all,
I would like to create a plugin, and I need a hand. Below are the
requirements I have.

  • Our documents are immutable. They are only ever created or
    deleted, updates do not apply.
  • We want mirrors of our ES cluster in multiple AWS regions. This
    way if the WAN between regions is severed for any reason, we do not suffer
    an outage, just a delay in consistency.
  • As documents are added or removed they are rolled up then shipped
    in batch to the other AWS Regions. This can be a fast as a few
    milliseconds, or as slow as minutes, and will be user configurable. Note
    that a full backup+load is too slow, this is more of a near realtime
    operation.
  • This will sync the following operations.
    • Index creation/deletion
    • Alias creation/deletion
    • Document creation/deletion

What I'm thinking architecturally.

  • The plugin is installed on each node in our cluster in all regions
  • The plugin will only gather changes for the primary shards on the
    local node
  • After the timeout elapses, the plugin will ship the changelog to
    the other AWS regions, where the plugin will receive it and process it

Are there any api's I can look at that are a good starting point for
developing this? I'd like to do a simple prototype with 2 1 node clusters
reasonably soon. I found several plugin tutorials, but I'm more concerned
with what part of the ES api I can call to receive events, if any.

Thanks,
Todd

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

This looks promising.

For admin operations, see also the tribe node. A special "replication-aware
tribe node" (or maybe more than one tribe node for resiliency) could
supervise the cluster-to-cluster replication.

For the segment strategy, I think it is hard to go down to the level of the
index store and capture the files properly and put it over the wire to a
target. It should be better to replicate on shard level. Maybe by reusing
some of the code
of org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService so
that a tribe node can trigger a snapshot action on the source cluster
master, open a transactional connection from a node in the source cluster
to a node in the target cluster, and place a restore action on a queue on
the target cluster master, plus a rollback logic if shard transaction
fails. So in short, the ES cluster to cluster replication process could be
realized by a "primary shard replication protocol".

Just my 2¢

Jörg

On Fri, Jan 23, 2015 at 7:42 PM, Todd Nine tnine@apigee.com wrote:

Thanks for the pointers Jorg,
We use Rx Java in our current application, so I'm familiar with
backpressure and ensuring we don't overwhelm target systems. I've been
mulling over the high level design a bit more. A common approach in all
systems that perform multi region replication is the concept of "log
shipping". It's used heavily in SQL systems for replication, as well as in
systems such as Megastore/HBase. This seems like it would be the most
efficient way to ship data from Region A to Region B with a reasonable
amount of latency. I was thinking something like the following.

Admin Operation Replication

This can get messy quickly. I'm thinking I won't have any sort of "merge"
logic since this can get very different for everyone's use case. I was
going to support broadcasting the following operations.

  • Index creation
  • Index deletion
  • Index mapping updates
  • Alias index addition
  • Alias index removal

This can also get tricky because it makes the assumption of unique index
operations in each region. Our indexes are Time UUID based, so I know we
won't get conflicts. I won't handle the case of an operation being
replayed that conflicts with an existing index, I'll simply log it and drop
it. Handlers could be built in later so users could create their own
resolution logic. Also, this must be replayed in a very strict order. I'm
concerned that adding this additional master/master region communication
could result in more load on the master. This can be solved by running a
dedicated master, but I don't really see any other solution.

Data Replication

  1. Store last sent segments, probably in a system index. Each region
    could be offline at different times, so for each segment I'll need to know
    where it's been sent.

  2. Monitor segments as they're created. I still need to figure this out a
    bit more in the context of latent sending.

Example. Region us-east-1 ES nodes.

We missed sending 5 segments to us-west-1 , and they were merged into 1.
I now only need to send the 1 merged segment to us-west-1, since the other
5 segments will be removed.

However, then a merged segment is created in us-east-1 from 5 segments
I've already sent to us-west-1, I won't want to ship that since it will
already contain the data. As the tree is continually merged, I'll need to
somehow sort out what contains shipped data, and what contains unshipped
data.

  1. As a new segment is created perform the following.
    3.a) Replay any administrative operations since the last sync on the
    index to the target region, so the state is current.
    3.b) Push the segment to the target region

  2. The region receives the segment, and adds it to it's current segments.
    When a segment merge happens in the receiving region, this will get merged
    in.

Thoughts?

On Thursday, January 15, 2015 at 5:29:10 PM UTC-7, Jörg Prante wrote:

While it seems quite easy to attach listeners to an ES node to capture
operations in translog-style and push out index/delete operations on shard
level somehow, there will be more to consider for a reliable solution.

The Couchbase developers have added a data replication protocol to their
product which is meant for transporting changes over long distances with
latency for in-memory processing.

To learn about the most important features, see

https://github.com/couchbaselabs/dcp-documentation

and

http://docs.couchbase.com/admin/admin/Concepts/dcp.html

I think bringing such a concept of an inter cluster protocol into ES
could be a good starting point, to sketch the complete path for such an
ambitious project beforehand.

Most challenging could be dealing with back pressure when receiving
nodes/clusters are becoming slow. For a solution to this, reactive Java /
reactive streams look like a viable possibility.

See also

https://github.com/ReactiveX/RxJava/wiki/Backpressure

http://www.ratpack.io/manual/current/streams.html

I'm in favor of Ratpack since it comes with Java 8, Groovy, Google Guava,
and Netty, which has a resemblance to ES.

In ES, for inter cluster communication, there is not much coded afaik,
except snapshot/restore. Maybe snapshot/restore can provide everything you
want, with incremental mode. Lucene will offer numbered segment files for
faster incremental snapshot/restore.

Just my 2¢

Jörg

On Thu, Jan 15, 2015 at 7:00 PM, Todd Nine tn...@apigee.com wrote:

Hey all,
I would like to create a plugin, and I need a hand. Below are the
requirements I have.

  • Our documents are immutable. They are only ever created or
    deleted, updates do not apply.
  • We want mirrors of our ES cluster in multiple AWS regions. This
    way if the WAN between regions is severed for any reason, we do not suffer
    an outage, just a delay in consistency.
  • As documents are added or removed they are rolled up then shipped
    in batch to the other AWS Regions. This can be a fast as a few
    milliseconds, or as slow as minutes, and will be user configurable. Note
    that a full backup+load is too slow, this is more of a near realtime
    operation.
  • This will sync the following operations.
    • Index creation/deletion
    • Alias creation/deletion
    • Document creation/deletion

What I'm thinking architecturally.

  • The plugin is installed on each node in our cluster in all regions
  • The plugin will only gather changes for the primary shards on the
    local node
  • After the timeout elapses, the plugin will ship the changelog to
    the other AWS regions, where the plugin will receive it and process it

Are there any api's I can look at that are a good starting point for
developing this? I'd like to do a simple prototype with 2 1 node clusters
reasonably soon. I found several plugin tutorials, but I'm more concerned
with what part of the ES api I can call to receive events, if any.

Thanks,
Todd

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%
40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%40googlegroups.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoGGqzp0rO80hK%2BBiPNQ2MrPx-GL0FJb-ArMSrFrqgz8XQ%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Thanks for the suggestion on the tribe nodes. I'll take a look
at org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService
more in depth. A reference implementation would be helpful in
understanding it's usage, do you happen to know of any projects that use
it?

From an architecture perspective, I'm concerned with having the cluster
master initiate any replication operations aside from replaying index
modifications. As we continue to increase our cluster size, I'm worried it
may become too much load on the master to keep up. Our system is getting
larger every day, we have 12 c3.4xl instances in each region currently.
Our client to ES is a multi-tennant system
(http://usergrid.incubator.apache.org/), so each application created in the
system will get it's own indexes in ES. This allows us to scale the indexes
using read/write aliases per each application's usage.

To take a step back even further, is there a way we can use something
existing in ES to perform this work, possibly with routing rules etc? My
primary concern is that we don't have to query across regions, can recover
from a region or network outage, and that replication can begin once
communication between regions is restored. I seem to be venturing into
uncharted territory here by thinking I need to create a plugin, and I doubt
I'm the first user to encounter such a problem. If there are any other
known solutions, that would be great. I just need our replication time to
be every N seconds.

Thanks again!
Todd

On Friday, January 23, 2015 at 2:43:08 PM UTC-7, Jörg Prante wrote:

This looks promising.

For admin operations, see also the tribe node. A special
"replication-aware tribe node" (or maybe more than one tribe node for
resiliency) could supervise the cluster-to-cluster replication.

For the segment strategy, I think it is hard to go down to the level of
the index store and capture the files properly and put it over the wire to
a target. It should be better to replicate on shard level. Maybe by reusing
some of the code
of org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService so
that a tribe node can trigger a snapshot action on the source cluster
master, open a transactional connection from a node in the source cluster
to a node in the target cluster, and place a restore action on a queue on
the target cluster master, plus a rollback logic if shard transaction
fails. So in short, the ES cluster to cluster replication process could be
realized by a "primary shard replication protocol".

Just my 2¢

Jörg

On Fri, Jan 23, 2015 at 7:42 PM, Todd Nine <tn...@apigee.com <javascript:>

wrote:

Thanks for the pointers Jorg,
We use Rx Java in our current application, so I'm familiar with
backpressure and ensuring we don't overwhelm target systems. I've been
mulling over the high level design a bit more. A common approach in all
systems that perform multi region replication is the concept of "log
shipping". It's used heavily in SQL systems for replication, as well as in
systems such as Megastore/HBase. This seems like it would be the most
efficient way to ship data from Region A to Region B with a reasonable
amount of latency. I was thinking something like the following.

Admin Operation Replication

This can get messy quickly. I'm thinking I won't have any sort of
"merge" logic since this can get very different for everyone's use case. I
was going to support broadcasting the following operations.

  • Index creation
  • Index deletion
  • Index mapping updates
  • Alias index addition
  • Alias index removal

This can also get tricky because it makes the assumption of unique index
operations in each region. Our indexes are Time UUID based, so I know we
won't get conflicts. I won't handle the case of an operation being
replayed that conflicts with an existing index, I'll simply log it and drop
it. Handlers could be built in later so users could create their own
resolution logic. Also, this must be replayed in a very strict order. I'm
concerned that adding this additional master/master region communication
could result in more load on the master. This can be solved by running a
dedicated master, but I don't really see any other solution.

Data Replication

  1. Store last sent segments, probably in a system index. Each region
    could be offline at different times, so for each segment I'll need to know
    where it's been sent.

  2. Monitor segments as they're created. I still need to figure this out
    a bit more in the context of latent sending.

Example. Region us-east-1 ES nodes.

We missed sending 5 segments to us-west-1 , and they were merged into 1.
I now only need to send the 1 merged segment to us-west-1, since the other
5 segments will be removed.

However, then a merged segment is created in us-east-1 from 5 segments
I've already sent to us-west-1, I won't want to ship that since it will
already contain the data. As the tree is continually merged, I'll need to
somehow sort out what contains shipped data, and what contains unshipped
data.

  1. As a new segment is created perform the following.
    3.a) Replay any administrative operations since the last sync on the
    index to the target region, so the state is current.
    3.b) Push the segment to the target region

  2. The region receives the segment, and adds it to it's current
    segments. When a segment merge happens in the receiving region, this will
    get merged in.

Thoughts?

On Thursday, January 15, 2015 at 5:29:10 PM UTC-7, Jörg Prante wrote:

While it seems quite easy to attach listeners to an ES node to capture
operations in translog-style and push out index/delete operations on shard
level somehow, there will be more to consider for a reliable solution.

The Couchbase developers have added a data replication protocol to their
product which is meant for transporting changes over long distances with
latency for in-memory processing.

To learn about the most important features, see

https://github.com/couchbaselabs/dcp-documentation

and

http://docs.couchbase.com/admin/admin/Concepts/dcp.html

I think bringing such a concept of an inter cluster protocol into ES
could be a good starting point, to sketch the complete path for such an
ambitious project beforehand.

Most challenging could be dealing with back pressure when receiving
nodes/clusters are becoming slow. For a solution to this, reactive Java /
reactive streams look like a viable possibility.

See also

https://github.com/ReactiveX/RxJava/wiki/Backpressure

http://www.ratpack.io/manual/current/streams.html

I'm in favor of Ratpack since it comes with Java 8, Groovy, Google
Guava, and Netty, which has a resemblance to ES.

In ES, for inter cluster communication, there is not much coded afaik,
except snapshot/restore. Maybe snapshot/restore can provide everything you
want, with incremental mode. Lucene will offer numbered segment files for
faster incremental snapshot/restore.

Just my 2¢

Jörg

On Thu, Jan 15, 2015 at 7:00 PM, Todd Nine tn...@apigee.com wrote:

Hey all,
I would like to create a plugin, and I need a hand. Below are the
requirements I have.

  • Our documents are immutable. They are only ever created or
    deleted, updates do not apply.
  • We want mirrors of our ES cluster in multiple AWS regions. This
    way if the WAN between regions is severed for any reason, we do not suffer
    an outage, just a delay in consistency.
  • As documents are added or removed they are rolled up then shipped
    in batch to the other AWS Regions. This can be a fast as a few
    milliseconds, or as slow as minutes, and will be user configurable. Note
    that a full backup+load is too slow, this is more of a near realtime
    operation.
  • This will sync the following operations.
    • Index creation/deletion
    • Alias creation/deletion
    • Document creation/deletion

What I'm thinking architecturally.

  • The plugin is installed on each node in our cluster in all regions
  • The plugin will only gather changes for the primary shards on the
    local node
  • After the timeout elapses, the plugin will ship the changelog to
    the other AWS regions, where the plugin will receive it and process it

Are there any api's I can look at that are a good starting point for
developing this? I'd like to do a simple prototype with 2 1 node clusters
reasonably soon. I found several plugin tutorials, but I'm more concerned
with what part of the ES api I can call to receive events, if any.

Thanks,
Todd

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%
40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%40googlegroups.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/d9a76640-81f5-4569-8415-8936b3a33e33%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

The IndexShardSnapshotAndRestoreService is from the snapshot/restore
feature. It allows to push snapshots to a shared file storage, and the
restore allows to retrieve snapshots and place them into the current
cluster. By snapshot/restore, an "off-line" synchronization utility already
exists.

The idea is now

  • let the master supervise a queue of inter-cluster snapshot/restore
    operations. This could be stored in the cluster state as custom data.

  • instead of using a shared file interim storage, the snapshots are
    directly pushed to another cluster (with the help of tribe node mechanism).
    It is not the master doing the work, but one or more tribe node workers
    that consume the queue operations.

  • on the target cluster, the workers consume "receive" operations and write
    the snapshots into the running cluster

By using RxJava style API for the cluster-to-cluster communication (I can
only imagine a modified tribe node here), the replication process could be
made more adaptive. For example, the inter-cluster snapshot/restore queue
could be throttled when the target cluster is becoming slow.

If tribe node is not suitable, a transport client mechanism could also do
the job.

Jörg

On Fri, Jan 23, 2015 at 11:56 PM, Todd Nine tnine@apigee.com wrote:

Thanks for the suggestion on the tribe nodes. I'll take a look
at org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService
more in depth. A reference implementation would be helpful in
understanding it's usage, do you happen to know of any projects that use
it?

From an architecture perspective, I'm concerned with having the cluster
master initiate any replication operations aside from replaying index
modifications. As we continue to increase our cluster size, I'm worried it
may become too much load on the master to keep up. Our system is getting
larger every day, we have 12 c3.4xl instances in each region currently.
Our client to ES is a multi-tennant system (
http://usergrid.incubator.apache.org/), so each application created in
the system will get it's own indexes in ES. This allows us to scale the
indexes using read/write aliases per each application's usage.

To take a step back even further, is there a way we can use something
existing in ES to perform this work, possibly with routing rules etc? My
primary concern is that we don't have to query across regions, can recover
from a region or network outage, and that replication can begin once
communication between regions is restored. I seem to be venturing into
uncharted territory here by thinking I need to create a plugin, and I doubt
I'm the first user to encounter such a problem. If there are any other
known solutions, that would be great. I just need our replication time to
be every N seconds.

Thanks again!
Todd

On Friday, January 23, 2015 at 2:43:08 PM UTC-7, Jörg Prante wrote:

This looks promising.

For admin operations, see also the tribe node. A special
"replication-aware tribe node" (or maybe more than one tribe node for
resiliency) could supervise the cluster-to-cluster replication.

For the segment strategy, I think it is hard to go down to the level of
the index store and capture the files properly and put it over the wire to
a target. It should be better to replicate on shard level. Maybe by reusing
some of the code of org.elasticsearch.index.snapshots.
IndexShardSnapshotAndRestoreService so that a tribe node can trigger a
snapshot action on the source cluster master, open a transactional
connection from a node in the source cluster to a node in the target
cluster, and place a restore action on a queue on the target cluster
master, plus a rollback logic if shard transaction fails. So in short, the
ES cluster to cluster replication process could be realized by a "primary
shard replication protocol".

Just my 2¢

Jörg

On Fri, Jan 23, 2015 at 7:42 PM, Todd Nine tn...@apigee.com wrote:

Thanks for the pointers Jorg,
We use Rx Java in our current application, so I'm familiar with
backpressure and ensuring we don't overwhelm target systems. I've been
mulling over the high level design a bit more. A common approach in all
systems that perform multi region replication is the concept of "log
shipping". It's used heavily in SQL systems for replication, as well as in
systems such as Megastore/HBase. This seems like it would be the most
efficient way to ship data from Region A to Region B with a reasonable
amount of latency. I was thinking something like the following.

Admin Operation Replication

This can get messy quickly. I'm thinking I won't have any sort of
"merge" logic since this can get very different for everyone's use case. I
was going to support broadcasting the following operations.

  • Index creation
  • Index deletion
  • Index mapping updates
  • Alias index addition
  • Alias index removal

This can also get tricky because it makes the assumption of unique index
operations in each region. Our indexes are Time UUID based, so I know we
won't get conflicts. I won't handle the case of an operation being
replayed that conflicts with an existing index, I'll simply log it and drop
it. Handlers could be built in later so users could create their own
resolution logic. Also, this must be replayed in a very strict order. I'm
concerned that adding this additional master/master region communication
could result in more load on the master. This can be solved by running a
dedicated master, but I don't really see any other solution.

Data Replication

  1. Store last sent segments, probably in a system index. Each region
    could be offline at different times, so for each segment I'll need to know
    where it's been sent.

  2. Monitor segments as they're created. I still need to figure this out
    a bit more in the context of latent sending.

Example. Region us-east-1 ES nodes.

We missed sending 5 segments to us-west-1 , and they were merged into

  1. I now only need to send the 1 merged segment to us-west-1, since the
    other 5 segments will be removed.

However, then a merged segment is created in us-east-1 from 5 segments
I've already sent to us-west-1, I won't want to ship that since it will
already contain the data. As the tree is continually merged, I'll need to
somehow sort out what contains shipped data, and what contains unshipped
data.

  1. As a new segment is created perform the following.
    3.a) Replay any administrative operations since the last sync on the
    index to the target region, so the state is current.
    3.b) Push the segment to the target region

  2. The region receives the segment, and adds it to it's current
    segments. When a segment merge happens in the receiving region, this will
    get merged in.

Thoughts?

On Thursday, January 15, 2015 at 5:29:10 PM UTC-7, Jörg Prante wrote:

While it seems quite easy to attach listeners to an ES node to capture
operations in translog-style and push out index/delete operations on shard
level somehow, there will be more to consider for a reliable solution.

The Couchbase developers have added a data replication protocol to
their product which is meant for transporting changes over long distances
with latency for in-memory processing.

To learn about the most important features, see

https://github.com/couchbaselabs/dcp-documentation

and

http://docs.couchbase.com/admin/admin/Concepts/dcp.html

I think bringing such a concept of an inter cluster protocol into ES
could be a good starting point, to sketch the complete path for such an
ambitious project beforehand.

Most challenging could be dealing with back pressure when receiving
nodes/clusters are becoming slow. For a solution to this, reactive Java /
reactive streams look like a viable possibility.

See also

https://github.com/ReactiveX/RxJava/wiki/Backpressure

http://www.ratpack.io/manual/current/streams.html

I'm in favor of Ratpack since it comes with Java 8, Groovy, Google
Guava, and Netty, which has a resemblance to ES.

In ES, for inter cluster communication, there is not much coded afaik,
except snapshot/restore. Maybe snapshot/restore can provide everything you
want, with incremental mode. Lucene will offer numbered segment files for
faster incremental snapshot/restore.

Just my 2¢

Jörg

On Thu, Jan 15, 2015 at 7:00 PM, Todd Nine tn...@apigee.com wrote:

Hey all,
I would like to create a plugin, and I need a hand. Below are the
requirements I have.

  • Our documents are immutable. They are only ever created or
    deleted, updates do not apply.
  • We want mirrors of our ES cluster in multiple AWS regions. This
    way if the WAN between regions is severed for any reason, we do not suffer
    an outage, just a delay in consistency.
  • As documents are added or removed they are rolled up then
    shipped in batch to the other AWS Regions. This can be a fast as a few
    milliseconds, or as slow as minutes, and will be user configurable. Note
    that a full backup+load is too slow, this is more of a near realtime
    operation.
  • This will sync the following operations.
    • Index creation/deletion
    • Alias creation/deletion
    • Document creation/deletion

What I'm thinking architecturally.

  • The plugin is installed on each node in our cluster in all
    regions
  • The plugin will only gather changes for the primary shards on
    the local node
  • After the timeout elapses, the plugin will ship the changelog to
    the other AWS regions, where the plugin will receive it and process it

Are there any api's I can look at that are a good starting point for
developing this? I'd like to do a simple prototype with 2 1 node clusters
reasonably soon. I found several plugin tutorials, but I'm more concerned
with what part of the ES api I can call to receive events, if any.

Thanks,
Todd

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40goo
glegroups.com
https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%
40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%40googlegroups.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/d9a76640-81f5-4569-8415-8936b3a33e33%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/d9a76640-81f5-4569-8415-8936b3a33e33%40googlegroups.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoFLvHXFHpFG9Bmj21st79%2BbBaA%3DS76AeR%2B2yK%2B-0kEzrg%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Hi,

A common approach for replicating changes across multiple geographically
distributed clusters if to put a message queue in front of Elasticsearch
and feed all data modifications through this so that they can be applied to
the clusters independently. This allows issues with unreliable connections
to be handled as long as the message queue is able to queue up enough
events.

Any solution based on snapshot and restore would most likely require the
existence of a single master cluster for each index. If updates could be
applied to any cluster, I data could be lost when a snapshot from another
cluster is restored.

Regards,

Christian

On Sunday, 25 January 2015 13:15:45 UTC, Jörg Prante wrote:

The IndexShardSnapshotAndRestoreService is from the snapshot/restore
feature. It allows to push snapshots to a shared file storage, and the
restore allows to retrieve snapshots and place them into the current
cluster. By snapshot/restore, an "off-line" synchronization utility already
exists.

The idea is now

  • let the master supervise a queue of inter-cluster snapshot/restore
    operations. This could be stored in the cluster state as custom data.

  • instead of using a shared file interim storage, the snapshots are
    directly pushed to another cluster (with the help of tribe node mechanism).
    It is not the master doing the work, but one or more tribe node workers
    that consume the queue operations.

  • on the target cluster, the workers consume "receive" operations and
    write the snapshots into the running cluster

By using RxJava style API for the cluster-to-cluster communication (I can
only imagine a modified tribe node here), the replication process could be
made more adaptive. For example, the inter-cluster snapshot/restore queue
could be throttled when the target cluster is becoming slow.

If tribe node is not suitable, a transport client mechanism could also do
the job.

Jörg

On Fri, Jan 23, 2015 at 11:56 PM, Todd Nine <tn...@apigee.com
<javascript:>> wrote:

Thanks for the suggestion on the tribe nodes. I'll take a look
at org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService
more in depth. A reference implementation would be helpful in
understanding it's usage, do you happen to know of any projects that use
it?

From an architecture perspective, I'm concerned with having the cluster
master initiate any replication operations aside from replaying index
modifications. As we continue to increase our cluster size, I'm worried it
may become too much load on the master to keep up. Our system is getting
larger every day, we have 12 c3.4xl instances in each region currently.
Our client to ES is a multi-tennant system (
http://usergrid.incubator.apache.org/), so each application created in
the system will get it's own indexes in ES. This allows us to scale the
indexes using read/write aliases per each application's usage.

To take a step back even further, is there a way we can use something
existing in ES to perform this work, possibly with routing rules etc? My
primary concern is that we don't have to query across regions, can recover
from a region or network outage, and that replication can begin once
communication between regions is restored. I seem to be venturing into
uncharted territory here by thinking I need to create a plugin, and I doubt
I'm the first user to encounter such a problem. If there are any other
known solutions, that would be great. I just need our replication time to
be every N seconds.

Thanks again!
Todd

On Friday, January 23, 2015 at 2:43:08 PM UTC-7, Jörg Prante wrote:

This looks promising.

For admin operations, see also the tribe node. A special
"replication-aware tribe node" (or maybe more than one tribe node for
resiliency) could supervise the cluster-to-cluster replication.

For the segment strategy, I think it is hard to go down to the level of
the index store and capture the files properly and put it over the wire to
a target. It should be better to replicate on shard level. Maybe by reusing
some of the code of org.elasticsearch.index.snapshots.
IndexShardSnapshotAndRestoreService so that a tribe node can trigger a
snapshot action on the source cluster master, open a transactional
connection from a node in the source cluster to a node in the target
cluster, and place a restore action on a queue on the target cluster
master, plus a rollback logic if shard transaction fails. So in short, the
ES cluster to cluster replication process could be realized by a "primary
shard replication protocol".

Just my 2¢

Jörg

On Fri, Jan 23, 2015 at 7:42 PM, Todd Nine tn...@apigee.com wrote:

Thanks for the pointers Jorg,
We use Rx Java in our current application, so I'm familiar with
backpressure and ensuring we don't overwhelm target systems. I've been
mulling over the high level design a bit more. A common approach in all
systems that perform multi region replication is the concept of "log
shipping". It's used heavily in SQL systems for replication, as well as in
systems such as Megastore/HBase. This seems like it would be the most
efficient way to ship data from Region A to Region B with a reasonable
amount of latency. I was thinking something like the following.

Admin Operation Replication

This can get messy quickly. I'm thinking I won't have any sort of
"merge" logic since this can get very different for everyone's use case. I
was going to support broadcasting the following operations.

  • Index creation
  • Index deletion
  • Index mapping updates
  • Alias index addition
  • Alias index removal

This can also get tricky because it makes the assumption of unique
index operations in each region. Our indexes are Time UUID based, so I
know we won't get conflicts. I won't handle the case of an operation being
replayed that conflicts with an existing index, I'll simply log it and drop
it. Handlers could be built in later so users could create their own
resolution logic. Also, this must be replayed in a very strict order. I'm
concerned that adding this additional master/master region communication
could result in more load on the master. This can be solved by running a
dedicated master, but I don't really see any other solution.

Data Replication

  1. Store last sent segments, probably in a system index. Each region
    could be offline at different times, so for each segment I'll need to know
    where it's been sent.

  2. Monitor segments as they're created. I still need to figure this
    out a bit more in the context of latent sending.

Example. Region us-east-1 ES nodes.

We missed sending 5 segments to us-west-1 , and they were merged into

  1. I now only need to send the 1 merged segment to us-west-1, since the
    other 5 segments will be removed.

However, then a merged segment is created in us-east-1 from 5 segments
I've already sent to us-west-1, I won't want to ship that since it will
already contain the data. As the tree is continually merged, I'll need to
somehow sort out what contains shipped data, and what contains unshipped
data.

  1. As a new segment is created perform the following.
    3.a) Replay any administrative operations since the last sync on the
    index to the target region, so the state is current.
    3.b) Push the segment to the target region

  2. The region receives the segment, and adds it to it's current
    segments. When a segment merge happens in the receiving region, this will
    get merged in.

Thoughts?

On Thursday, January 15, 2015 at 5:29:10 PM UTC-7, Jörg Prante wrote:

While it seems quite easy to attach listeners to an ES node to capture
operations in translog-style and push out index/delete operations on shard
level somehow, there will be more to consider for a reliable solution.

The Couchbase developers have added a data replication protocol to
their product which is meant for transporting changes over long distances
with latency for in-memory processing.

To learn about the most important features, see

https://github.com/couchbaselabs/dcp-documentation

and

http://docs.couchbase.com/admin/admin/Concepts/dcp.html

I think bringing such a concept of an inter cluster protocol into ES
could be a good starting point, to sketch the complete path for such an
ambitious project beforehand.

Most challenging could be dealing with back pressure when receiving
nodes/clusters are becoming slow. For a solution to this, reactive Java /
reactive streams look like a viable possibility.

See also

https://github.com/ReactiveX/RxJava/wiki/Backpressure

http://www.ratpack.io/manual/current/streams.html

I'm in favor of Ratpack since it comes with Java 8, Groovy, Google
Guava, and Netty, which has a resemblance to ES.

In ES, for inter cluster communication, there is not much coded afaik,
except snapshot/restore. Maybe snapshot/restore can provide everything you
want, with incremental mode. Lucene will offer numbered segment files for
faster incremental snapshot/restore.

Just my 2¢

Jörg

On Thu, Jan 15, 2015 at 7:00 PM, Todd Nine tn...@apigee.com wrote:

Hey all,
I would like to create a plugin, and I need a hand. Below are the
requirements I have.

  • Our documents are immutable. They are only ever created or
    deleted, updates do not apply.
  • We want mirrors of our ES cluster in multiple AWS regions.
    This way if the WAN between regions is severed for any reason, we do not
    suffer an outage, just a delay in consistency.
  • As documents are added or removed they are rolled up then
    shipped in batch to the other AWS Regions. This can be a fast as a few
    milliseconds, or as slow as minutes, and will be user configurable. Note
    that a full backup+load is too slow, this is more of a near realtime
    operation.
  • This will sync the following operations.
    • Index creation/deletion
    • Alias creation/deletion
    • Document creation/deletion

What I'm thinking architecturally.

  • The plugin is installed on each node in our cluster in all
    regions
  • The plugin will only gather changes for the primary shards on
    the local node
  • After the timeout elapses, the plugin will ship the changelog
    to the other AWS regions, where the plugin will receive it and process it

Are there any api's I can look at that are a good starting point for
developing this? I'd like to do a simple prototype with 2 1 node clusters
reasonably soon. I found several plugin tutorials, but I'm more concerned
with what part of the ES api I can call to receive events, if any.

Thanks,
Todd

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it,
send an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40goo
glegroups.com
https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearc...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%
40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%40googlegroups.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/d9a76640-81f5-4569-8415-8936b3a33e33%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/d9a76640-81f5-4569-8415-8936b3a33e33%40googlegroups.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/46e162d2-1a73-4d9d-ac9b-2063f96bedce%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.