River throughput (single threaded updates)


(Paul Smith) #1

In terms of an NRT-based index, I was thinking of using a River to have an
Application stack send messages via, say, RabbitMQ to ES with updates. The
pro's for this approach are that updates are stored even if the application
that is making the updates catches fire (no updates are lost, as they are
queued and processed 'eventually', hopefully in NRT ). From looking at the
RabbitMQ river code it looks like a single thread is responsible for
processing the JMS queue, and it does try to do it in a bulk
fashion, coalescing batch messages into larger chunks for efficient updates
to ES. Even better the versioning code ensures any out of order writes are
discarded.

Having said that though, it's still single threaded. In an application
where there are multiple threads (hundreds) producing the updates there's
good chance for overrunning a single threaded queue. In our legacy Lucene
based architecture we used ActiveMQ with multiple threads consuming a queue
to apply updates. That suffers from it's own problems in that there's
eventually contention for updates on specific indexes, so the multiple
threads may not really benefit in practice.

Another obvious other option is to have the application thread itself send
the updates via the Java client (asynchronously as is the default). This
would batter the ES instance with many small updates though and wouldn't
have the luxury of the coalescing strategy the River has. Secondly while it
is asynchronous like the River ensuring the application code is responsive
and not adding latency to user actions, it can suffer from lost updates in
the event the application dies before an update to ES has been sent and
processed. Given a user update action generates an transaction update to
the DB, it's important to try to keep the Index index in sync as close to
possible without the hideousness of having 2PC (gack). With a JMS queue
based approach it means ES becomes sort of eventually-consistent, laggy
maybe, but never lost, as long as the placing of the JMS message in the
queue is part of the same transaction as the DB (although again that's 2PC,
in reality we currently 'live' with a hand-rolled post-db-txn commit hook
that pushes the JMS message, probably no more risky than the simple
async-client approach.

Just wondering on what other strategies people have for a 'high' volume
update volume NRT strategy? To give some context looking at our perf data
right now in peak-ish times we're producing about 10 JMS messages/second
with on average around 15-30 individual 'entity' updates in that payload
(each message may contain multiple entity updates, but a good chunk are just
a small handful, even just 1). I'm sure there's larger volume sites out
there that may be able to comment, I'd love to hear how they're dealing with
this sort of issue.

cheers,

Paul Smith


(Shay Banon) #2

Heya,

10 messages a second does not sound much, a single thread pulling from the query should handle it easily. Note also that the bulk requests are sent in an async manner (though you can pass the ordered flag to it to disable it) once enough bulk items have accumulated.

Also, though I haven't tested it, I don't see a reason why you won't be able to create several rabbitmq rivers, all running against the same queue, this should give you a nice way to "multi thread" the consumption of the queue. More over, if you have several nodes, elasticsearch will try and allocate those rives on different nodes.

As a side note, you keep mentioning JMS. I have been thinking of creating a JMS river, if thats something that people are interested. Rabbit seemed like a good initial messaging river, simply because many non Java users tend to use it more than JMS (maybe over Stomp).

-shay.banon
On Tuesday, March 22, 2011 at 1:30 AM, Paul Smith wrote:

In terms of an NRT-based index, I was thinking of using a River to have an Application stack send messages via, say, RabbitMQ to ES with updates. The pro's for this approach are that updates are stored even if the application that is making the updates catches fire (no updates are lost, as they are queued and processed 'eventually', hopefully in NRT ). From looking at the RabbitMQ river code it looks like a single thread is responsible for processing the JMS queue, and it does try to do it in a bulk fashion, coalescing batch messages into larger chunks for efficient updates to ES. Even better the versioning code ensures any out of order writes are discarded.

Having said that though, it's still single threaded. In an application where there are multiple threads (hundreds) producing the updates there's good chance for overrunning a single threaded queue. In our legacy Lucene based architecture we used ActiveMQ with multiple threads consuming a queue to apply updates. That suffers from it's own problems in that there's eventually contention for updates on specific indexes, so the multiple threads may not really benefit in practice.

Another obvious other option is to have the application thread itself send the updates via the Java client (asynchronously as is the default). This would batter the ES instance with many small updates though and wouldn't have the luxury of the coalescing strategy the River has. Secondly while it is asynchronous like the River ensuring the application code is responsive and not adding latency to user actions, it can suffer from lost updates in the event the application dies before an update to ES has been sent and processed. Given a user update action generates an transaction update to the DB, it's important to try to keep the Index index in sync as close to possible without the hideousness of having 2PC (gack). With a JMS queue based approach it means ES becomes sort of eventually-consistent, laggy maybe, but never lost, as long as the placing of the JMS message in the queue is part of the same transaction as the DB (although again that's 2PC, in reality we currently 'live' with
a hand-rolled post-db-txn commit hook that pushes the JMS message, probably no more risky than the simple async-client approach.

Just wondering on what other strategies people have for a 'high' volume update volume NRT strategy? To give some context looking at our perf data right now in peak-ish times we're producing about 10 JMS messages/second with on average around 15-30 individual 'entity' updates in that payload (each message may contain multiple entity updates, but a good chunk are just a small handful, even just 1). I'm sure there's larger volume sites out there that may be able to comment, I'd love to hear how they're dealing with this sort of issue.

cheers,

Paul Smith


(Paul Smith) #3

On 22 March 2011 10:40, Shay Banon shay.banon@elasticsearch.com wrote:

Heya,

10 messages a second does not sound much, a single thread pulling from
the query should handle it easily. Note also that the bulk requests are sent
in an async manner (though you can pass the ordered flag to it to disable
it) once enough bulk items have accumulated.

No, not much at all really. Peaks can be large though, just wanted to set
it in context, but the volume of growth is such that I wanted to think ahead
of the curve.

Also, though I haven't tested it, I don't see a reason why you won't be
able to create several rabbitmq rivers, all running against the same queue,
this should give you a nice way to "multi thread" the consumption of the
queue. More over, if you have several nodes, elasticsearch will try and
allocate those rives on different nodes.

So create, say, X distinct Rivers all pointing to the same queue? Good idea
(if needed).

As a side note, you keep mentioning JMS. I have been thinking of
creating a JMS river, if thats something that people are interested. Rabbit
seemed like a good initial messaging river, simply because many non Java
users tend to use it more than JMS (maybe over Stomp).

Yes refactoring the RabbitMQ to be JMS-standard would be a huge boon for
those with existing infrastructure set up. Even if it were refactored into
an Abstract parent class to make individual JMS vendor specific plugins
easier would be useful, but honestly given the payload of each message is a
JSON string a JMS spec compliant instance would be simple, maybe via a
jndi.context file (which I'm no fan, but at least it's implementation
agnostic) or having a Factory class defined at runtime in the settings to
create the ConnectionFactory.


(Berkay Mollamustafaoglu-2) #4

Hey Paul,

In our experience, ES easily handles 1000 msgs per second on a single server
thanks to it's architecture, bulk, async, shards, etc. I can sympathize
with the urge to stay ahead of the curve, especially if you were burned
before, ES sort of sounds too good to be true :slight_smile:
ES has a strong design to handle heavy loads so you'd need messages in many
thousands before you may have to do anything really sophisticated. Just my
1.23 cents

Regards,
Berkay Mollamustafaoglu
mberkay on yahoo, google and skype

On Mon, Mar 21, 2011 at 8:07 PM, Paul Smith tallpsmith@gmail.com wrote:

On 22 March 2011 10:40, Shay Banon shay.banon@elasticsearch.com wrote:

Heya,

10 messages a second does not sound much, a single thread pulling from
the query should handle it easily. Note also that the bulk requests are sent
in an async manner (though you can pass the ordered flag to it to disable
it) once enough bulk items have accumulated.

No, not much at all really. Peaks can be large though, just wanted to set
it in context, but the volume of growth is such that I wanted to think ahead
of the curve.

Also, though I haven't tested it, I don't see a reason why you won't be
able to create several rabbitmq rivers, all running against the same queue,
this should give you a nice way to "multi thread" the consumption of the
queue. More over, if you have several nodes, elasticsearch will try and
allocate those rives on different nodes.

So create, say, X distinct Rivers all pointing to the same queue? Good
idea (if needed).

As a side note, you keep mentioning JMS. I have been thinking of
creating a JMS river, if thats something that people are interested. Rabbit
seemed like a good initial messaging river, simply because many non Java
users tend to use it more than JMS (maybe over Stomp).

Yes refactoring the RabbitMQ to be JMS-standard would be a huge boon for
those with existing infrastructure set up. Even if it were refactored into
an Abstract parent class to make individual JMS vendor specific plugins
easier would be useful, but honestly given the payload of each message is a
JSON string a JMS spec compliant instance would be simple, maybe via a
jndi.context file (which I'm no fan, but at least it's implementation
agnostic) or having a Factory class defined at runtime in the settings to
create the ConnectionFactory.


(Paul Smith) #5

On 22 March 2011 11:16, Berkay Mollamustafaoglu mberkay@gmail.com wrote:

Hey Paul,

In our experience, ES easily handles 1000 msgs per second on a single
server thanks to it's architecture, bulk, async, shards, etc. I can
sympathize with the urge to stay ahead of the curve, especially if you were
burned before, ES sort of sounds too good to be true :slight_smile:
ES has a strong design to handle heavy loads so you'd need messages in many
thousands before you may have to do anything really sophisticated. Just my
1.23 cents

It's not necessarily the rate of msgs per second though that is the overall
concern, but the payload within them. Lets take a real world example here,
we have a business rule that during an Organization rename unfortunately
requires a bulk reindex of a lot of information so that it can be searched
with both the original historical name and the new name. In some cases the
amount of data to be reindexed is large (in a recent case, 1.6 million
items, and I've seen much larger than that). If a single message contains
this über-update it is highly likely to clog the River plumbing causing
latency in the NRT, something our customers loathe (and for good reason). I
suspect having a specific Über-River channel for large bulk updates is
probably a nicer way to ensure throughput of the usual lighter
weight updates messages, anything larger than some number X is routed
through a Bulk river for processing.

Thanks Shay for the 'multiple River' idea, it's such a simple concept, I
can't believe I didn't think of it myself.

Paul


(Shay Banon) #6

By the way, we can remove the need for dist. transaction by using a database table as the "queue". A simple table with columns to include the metadata, payload and order (auto increment id is probably simplest) which a river can listen on and pull changes from. Then, when you do your operations against the DB, you just add those operations that needs to be indexed to the database in a single atomic transaction.

Can get tricky, especially when trying to support multiple databases and sync'ing between several rivers working on the same table, but possible. Posting it here so I won't loose this thought that just crossed my mind :slight_smile:
On Wednesday, March 23, 2011 at 12:12 AM, Paul Smith wrote:

On 22 March 2011 11:16, Berkay Mollamustafaoglu mberkay@gmail.com wrote:

Hey Paul,

In our experience, ES easily handles 1000 msgs per second on a single server thanks to it's architecture, bulk, async, shards, etc. I can sympathize with the urge to stay ahead of the curve, especially if you were burned before, ES sort of sounds too good to be true :slight_smile:
ES has a strong design to handle heavy loads so you'd need messages in many thousands before you may have to do anything really sophisticated. Just my 1.23 cents

It's not necessarily the rate of msgs per second though that is the overall concern, but the payload within them. Lets take a real world example here, we have a business rule that during an Organization rename unfortunately requires a bulk reindex of a lot of information so that it can be searched with both the original historical name and the new name. In some cases the amount of data to be reindexed is large (in a recent case, 1.6 million items, and I've seen much larger than that). If a single message contains this über-update it is highly likely to clog the River plumbing causing latency in the NRT, something our customers loathe (and for good reason). I suspect having a specific Über-River channel for large bulk updates is probably a nicer way to ensure throughput of the usual lighter weight updates messages, anything larger than some number X is routed through a Bulk river for processing.

Thanks Shay for the 'multiple River' idea, it's such a simple concept, I can't believe I didn't think of it myself.

Paul


(Paul Smith) #7

On 23 March 2011 10:05, Shay Banon shay.banon@elasticsearch.com wrote:

By the way, we can remove the need for dist. transaction by using a
database table as the "queue". A simple table with columns to include the
metadata, payload and order (auto increment id is probably simplest) which a
river can listen on and pull changes from. Then, when you do your operations
against the DB, you just add those operations that needs to be indexed to
the database in a single atomic transaction.

Can get tricky, especially when trying to support multiple databases and
sync'ing between several rivers working on the same table, but possible.
Posting it here so I won't loose this thought that just crossed my mind :slight_smile:

Be interesting to see the concurrency of implementing the queue in a table.
Using a clustered index on the insertion time means that the consumer River
can 'eat' the rows from the front and new rows are being added at the tail
of the table to ensure that lock contention doesn't occur or is reduced.
At the end of the day most JMS implementations use JDBC for a backing store
so they'd face similar issues, but I think most JMS vendors have a local
in-memory queue that is buffered/backed by a WAL and perhaps a secondary
JDBC store for longer term persistence (this is what ActiveMQ does as I
under stand it) to get the performance.


(system) #8