Inner River Plugin...?

Hey all,

I'm just starting with ES, and I must say it is a beautiful piece of
software!

I will be using it as my main store for a health-related system. The
information that ES will deal with is a bit more sensible than in other
scenarios, so my main concern is to guaranty its persistence.

In order to do that, I will be storing all the info additionally to a
postgres instance, which will serve its purpose in case (plan C) I have to
rebuild the indexes from scratch, but as well I want to take advantage of
postgres PLV8 language integration to normalize some JSON data that it is
convenient to deal with in a relational fashion.

Now, I know I could just add the logic to do this right in the app, but:

  • I would have to be extra careful to guaranty that both ES and postgres
    have the same info in sync
  • It would add an overhead to the app and its development when I only
    care about ES access from there

Using a River is not an option, since postgres is not my main data access
point, nor I intend it to be.

What I came up with was to create some sort of ES plugin that would catch
all CUD operations straight from the engine and persist it to postgres from
there. This way I can guaranty that all operations that ES receives and
stores will effectively be backed to postgres, so if I keep my ES
operations atomic and consistent from the app, so it will for postgres.

What I have so far is a plugin that:

  • Binds an implementation of IndexingOperationListener
  • Catches all operations executed against each primary shard
  • Puts them in a transactional queue (currently implemented with
    BerkleyDB JE)
  • Has a worker thread that constantly reads from the queue in batches
    and transactionally updates postgres

So far so good, I'm calling it the InnerRiver and I like the approach. Now,
the questions:

  • What do you think about it?
  • Is there any tip, advice or something I should have in count that you
    can think of?
  • There are many post-something operation hooks
    in IndexingOperationListener, which ones should I care about, and which
    ones I should not (specially I want to know the difference between the lock
    and no lock events)?
  • Any memory concerns I should be thinking of?
  • Some better idea for the transactional queue?
  • Is there an elegant way inside ES to execute worker threads that
    adjust to the node's lifecycle?

Okay, that's a lot of questions (for now! :-)). If you guys like this idea,
I will gladly contribute it to the community!

Thanks a lot in advance!

Cheers,
Nicolas.

--

Hi Nicolas,

I understand you want a kind of integrated distributed system, with two
sides of data storage persistency, one that is reliable, while the other
more or less volatile, for search and indexing.

On the other hand, you want to handle updates at a single interface by the
"volatile" side, by using JSON for your messages, and write them
simultaneously to connected reliable data storage targets, by executing
commits, in order to get the data stay in sync.

You want to borrow Elasticsearch node JVM resources for other non-ES
services like implementing transactional message queues. Probably you want
your clients use the Elasticsearch REST API as the one-and-only message
passing point.

I can imagine a few challenges you will have to tackle.

  • Elasticsearch does not care about transactions, primary shards may move
    or change at any time - your "inner river" will have to do the hard work
    tracing the data all over the cluster to find commit points (possibly by
    the translog mechanism) - this will reduce overall performance a lot

  • you will have to avoid SPOF, consider failover scenarios of a singleton
    "inner river", you have to manage them for yourself (stopping/starting
    without interruption of service)

  • you will have to cope with bottleneck scenarios of data congestion.
    Consider the path your data travel through your system, and the expected
    maximum throughput of the data at any location you want to guarantee (for
    client channels, for message passing, for indexing, and for search)

  • sharing a JVM for ES and e.g. BerkeleyDB will add quite heavy concurreny
    for limited resources, and you will find it tough to make assertions for
    upper search response time or upper transaction time limits

Your design could be much easier if you don't reinvent the wheel.

I would recommend to use a message queue-based system. First, use
AMQP/RabbitMQ for transactional-style data distribution, then use an AMPQ
plugin for Postgresql, like shown here
http://lanyrd.com/2011/pgconfeu/smdkp/ and use the RabbitMQ river plugin
for Elasticsearch
http://www.elasticsearch.org/guide/reference/river/rabbitmq.html

With three nodes, you could maintain and scale the RabbitMQ, the
Postgresql, and the ES component independently. It's not a solution
out-of-the-box, but by carefully configuring and integrating the components
and by help of the different communities, you should get it running.

Cheers

Jörg

--

Hi Jörg,

Thanks for you detailed reply, I'll add some comments inline:

I understand you want a kind of integrated distributed system, with two

sides of data storage persistency, one that is reliable, while the other
more or less volatile, for search and indexing.

Not really. I want to use ES as my database, that is, my main store or
SPOT, not just for indexing and search. Postgres would be used to backup
all CRUD operations done over ES but as well so I can exploit that same
info in a relational fashion.

On the other hand, you want to handle updates at a single interface by the
"volatile" side, by using JSON for your messages, and write them
simultaneously to connected reliable data storage targets, by executing
commits, in order to get the data stay in sync.

You want to borrow Elasticsearch node JVM resources for other non-ES

services like implementing transactional message queues. Probably you want
your clients use the Elasticsearch REST API as the one-and-only message
passing point.

I just need a lightweight, in-process queue that is crash resistent to
stack documents that were successfully persisted in ES and then do batch
inserts into postgres from that lightweight queue

  • Elasticsearch does not care about transactions, primary shards may move
    or change at any time - your "inner river" will have to do the hard work
    tracing the data all over the cluster to find commit points (possibly by
    the translog mechanism) - this will reduce overall performance a lot

If I understood correctly, ES operations are ACID at document level, so as
long as I keep my CRUD operations focused in a single document I should be
fine. I just want to be sure that by hooking at the right event listeners,
I get access to the successfully persisted documents (only for the primary
shards), so I can add them to my queue (that is, per node queue). I'm
concerned about performance as I expect to have at most 1000 writes / sec,
and I don't care how much it takes to replicate that to postgres.

  • you will have to avoid SPOF, consider failover scenarios of a singleton
    "inner river", you have to manage them for yourself (stopping/starting
    without interruption of service)

That's why I want a resilient, although lightweight queue. The inner river
would be implemented as per node basis, each watching for write operations
on the primary shards.

  • sharing a JVM for ES and e.g. BerkeleyDB will add quite heavy concurreny
    for limited resources, and you will find it tough to make assertions for
    upper search response time or upper transaction time limits

Yes, maybe BerkleyDB is a bit heavy to be used as a queue, specially
because it has a transaction-log-like storage system AFAIK.

Your design could be much easier if you don't reinvent the wheel.

Yes, maybe I am... :slight_smile:

I would recommend to use a message queue-based system. First, use
AMQP/RabbitMQ for transactional-style data distribution, then use an AMPQ
plugin for Postgresql, like shown here
http://lanyrd.com/2011/pgconfeu/smdkp/ and use the RabbitMQ river plugin
for Elasticsearch
Elasticsearch Platform — Find real-time answers at scale | Elastic

With three nodes, you could maintain and scale the RabbitMQ, the
Postgresql, and the ES component independently. It's not a solution
out-of-the-box, but by carefully configuring and integrating the components
and by help of the different communities, you should get it running.

I thought about the rabbit, but I'm looking for something more lightweight
(maybe embedded ActiveMQ?) and with less moving parts, so I can avoid the
maintenance overhead.

Thanks a lot for your input!

Cheers,
Nicolas.

--

Smal correction:

"If I understood correctly, ES operations are ACID at document level, so as
long as I keep my CRUD operations focused in a single document I should be
fine. I just want to be sure that by hooking at the right event listeners,
I get access to the successfully persisted documents (only for the primary
shards), so I can add them to my queue (that is, per node queue). I'm
concerned about performance as I expect to have at most 1000 writes / sec,
and I don't care how much it takes to replicate that to postgres."

Sorry, I meant that I'm *NOT *concerned about performance.

--

Hi Nicolas,

On Wednesday, November 21, 2012 10:51:48 PM UTC+1, Nicolas Garfinkiel wrote:

If I understood correctly, ES operations are ACID at document level, so as
long as I keep my CRUD operations focused in a single document I should be
fine. I just want to be sure that by hooking at the right event listeners,
I get access to the successfully persisted documents (only for the primary
shards), so I can add them to my queue (that is, per node queue). I'm
concerned about performance as I expect to have at most 1000 writes / sec,
and I don't care how much it takes to replicate that to postgres.

ES uses a mix of caching and sync mechanisms, so, it is not ACID at
document level by default. In fact, you can force sync after each document
by a flush after each operation, but you will compromise Lucene's
efficiency. The default is after 5000 ops or after 500mb, whatever comes
first:
Elasticsearch Platform — Find real-time answers at scale | Elastic If
you disable this, you will curse ES for being slow...

You can use the listeners ES provides, but they do not tell you about a
true cluster-wide transactional state. What I understand, the events the
listeners use are local to the node. I have examined the translog mechanism
a little bit, because I want to find out how a node could generate events
about the current indexing in progress and how to propagate the node events
to the outside. My interest is a pubsub mechanism to provide external
subscribers with ES node events in realtime (in the quest for creating a
global _changes stream in perpetuity). Since the events are asynchronously,
it is not easy to model a single ordered stream of events from all the ES
nodes, because the nodes can operate without syncing each other. A kind of
event collector, located near to a requesting client, with a vector clock
seems like a solution to me (with a queue holding the events not consumed
so far).

If you can guarantee that your JVM will assign the resources to your queue
first (and not to the ES indexing and search), you could trust that your
JVM can handle a cache for your maximum queue size, and it could be assumed
to be reliable. But, I assume data congestion can be an issue, even with
"only" 1000 docs per sec...

AFAIK, the challenge of event queuing and asserting the durability of
events between JVM restarts, this is exactly what RabbitMQ was invented
for... and it comes with a nice admin gui...

Cheers,

Jörg

--

Hey Jörg, thanks for your answer, and sorry for my late reply. Comments are
inline.

ES uses a mix of caching and sync mechanisms, so, it is not ACID at

document level by default. In fact, you can force sync after each document
by a flush after each operation, but you will compromise Lucene's
efficiency. The default is after 5000 ops or after 500mb, whatever comes
first:
Elasticsearch Platform — Find real-time answers at scale | Elastic you disable this, you will curse ES for being slow...

As per the project description in Github:

  • Reliable, Asynchronous Write Behind for long term persistency.
  • Per operation consistency
    • Single document level operations are atomic, consistent, isolated
      and durable.

So if I put something, it will survive a crash, right? That's what I care
about.

You can use the listeners ES provides, but they do not tell you about a

true cluster-wide transactional state. What I understand, the events the
listeners use are local to the node.

Yes, I would have the plugin running for each node, only listening to the
primary shards from them

I have examined the translog mechanism a little bit, because I want to
find out how a node could generate events about the current indexing in
progress and how to propagate the node events to the outside.

That's interesting... I wonder if I could use the same translog mechanism
to create a unified translog from which I can read afterwards from a single
thread, instead of using Berkley or some other queuing mechanism. Of course
events from different shards will not be ordered, but I assume that events
from the same shard will.

My interest is a pubsub mechanism to provide external subscribers with ES
node events in realtime (in the quest for creating a global _changes stream
in perpetuity). Since the events are asynchronously, it is not easy to
model a single ordered stream of events from all the ES nodes, because the
nodes can operate without syncing each other. A kind of event collector,
located near to a requesting client, with a vector clock seems like a
solution to me (with a queue holding the events not consumed so far).

A great idea indeed. Would stand right in the middle of what a river does,
and what I want to achieve. :slight_smile:

If you can guarantee that your JVM will assign the resources to your queue
first (and not to the ES indexing and search), you could trust that your
JVM can handle a cache for your maximum queue size, and it could be assumed
to be reliable. But, I assume data congestion can be an issue, even with
"only" 1000 docs per sec...

That's what I'm afraid of... I need a lightweight queue, fast insert rate,
but transaction safe... Too much to ask, right? :wink:

AFAIK, the challenge of event queuing and asserting the durability of
events between JVM restarts, this is exactly what RabbitMQ was invented
for... and it comes with a nice admin gui...

Yes, I know RabbitMQ is great. But using it would add another moving part
to my system, which will have to be maintained, monitored, etc.

Cheers!
Nicolas.

--

Hi Nicolas,

On Wednesday, November 28, 2012 11:06:17 PM UTC+1, Nicolas Garfinkiel wrote:

    • Single document level operations are atomic, consistent, isolated
      and durable.

So if I put something, it will survive a crash, right? That's what I care
about.

Atomicity is per document, concurrent updates are possible without losing
consistency. There is optimistic locking by the versioning
feature. Durabilty: your documents survive a crash, because all document
index operations are written to a translog before they get executed. The
translog can be replayed in case of crashes. Elasticsearch uses optimistic
concurrency control, if something crashes, the index can be recovered from
the gateway storage by conflict resolution.

What I mean with slow was the fact that if you want all the data directly
going to disk storage for maximum persistency, you may want to flush the
indices every often after each document write. This would be similar to a
"commit" in a transactional system. And that is not what you want in ES
because of the overhead.

That's what I'm afraid of... I need a lightweight queue, fast insert rate,
but transaction safe... Too much to ask, right? :wink:

It depends... as long as you don't touch disks or persistent storage, you
can implement something very simple based on ConcurrentLinkedQueue, and
check if the queue does not grow too much so it can be kept in memory. The
hard part is when you want to persist such queues to external storage and
industry-proven designs like JMS or AMQP are not feasible. You can play
with lots of projects out there. But if they are robust, I really don't
know. For example, http://square.github.com/tape/ offers a QueueFile,
stating

"QueueFile is a lightning-fast, transactional, file-based FIFO. Addition
and removal from an instance is an O(1) operation and is atomic. Writes are
synchronous; data will be written to disk before an operation returns. The
underlying file is structured to survive process and even system crashes
and if an I/O exception is thrown during a mutating change, the change is
aborted."

Cheers,

Jörg

--