Performing Upserts in a River

Hi,

Any input or suggestions for the following would be welcome:

I am working on a project which aims to store some dynamic data about
customers within Elastic search.
The idea is that we can keep a profile of customer actions and be able to
search easily on various attributes of the customers.
The customer profile will be updated based on messages coming into a
RabbitMQ queue.

So the approach we are currently taking is to modify the RabbitMQRiver
plugin and instead of doing bulk updates, performing upserts on the
customers based on their ID.

The query load on ES will not be particularly high, so we have tried to
optimise the cluster (currently only 2 nodes) for indexing rather and
querying performance.

Does this sound like a reasonable approach? At this stage our river is
processing around 300-500 message/s from the RabbitMQ queue.

It appears that a River runs as a singleton within only one node of the
cluster. With that in mind, would it be possible to get better performance
having multiple workers listening on the rabbitMQ queue and individually
executing upserts to the ES cluster?
Additionally, as the River only runs on a single node, does this imply that
to scale up and process more messages, the best option is to increase the
machine size?

We have tried the following to boost the speed of the processing as well:

  • increased default shards to 20
  • increased the indexing_buffer to 20%

thanks for any advice :slight_smile:

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Hi,

a river is created in instances, so, each river instance is a
cluster-wide singleton.

It is up to the river how indexing is executed.

Note, bulk updates of existing documents are not implemented in ES. You
can mix create, insert (overwrite), or delete operations in bulk mode.
For updating existing documents in bulk mode, the bulk indexer will need
an additional notion of how to order incoming requests (receiving
operation requests from many concurrent sources, something like vector
clocks) which is not there yet.

I do not understand what "better performance" exactly means, can you
specify what kind of performance you are interested in? Less system
load? Faster ingest of baseline loading? Lower latency? Higher indexing
throughput?

If the input can be segmented (RabbitMQ comes with the notion of
channels) you can run many rivers in parallel. You can distribute river
instances over many nodes, just install a river instance for a channel
on the node you want.

In many cases you don't need to increase the "machine size" (assuming
you are addressing RAM and disk size). I don't know what size you have
in mind, so there is no good answer. Note, ES can horizontally scale
very well, that is, instead of a big machine you can use small machines
but just some more of them.

If you increase the number of shards, you may have better distribution
of load, but a bit slower overall indexing (which is acceptable).

You mention indexing_buffer, what do you mean? Do you mean
"max_shard_index_buffer_size" ?

Best regards,

Jörg

Am 26.02.13 06:24, schrieb rockbobsta:

Hi,

Any input or suggestions for the following would be welcome:

I am working on a project which aims to store some dynamic data about
customers within Elastic search.
The idea is that we can keep a profile of customer actions and be able
to search easily on various attributes of the customers.
The customer profile will be updated based on messages coming into a
RabbitMQ queue.

So the approach we are currently taking is to modify the RabbitMQRiver
plugin and instead of doing bulk updates, performing upserts on the
customers based on their ID.

The query load on ES will not be particularly high, so we have tried
to optimise the cluster (currently only 2 nodes) for indexing rather
and querying performance.

Does this sound like a reasonable approach? At this stage our river is
processing around 300-500 message/s from the RabbitMQ queue.

It appears that a River runs as a singleton within only one node of
the cluster. With that in mind, would it be possible to get better
performance having multiple workers listening on the rabbitMQ queue
and individually executing upserts to the ES cluster?
Additionally, as the River only runs on a single node, does this imply
that to scale up and process more messages, the best option is to
increase the machine size?

We have tried the following to boost the speed of the processing as well:

  • increased default shards to 20
  • increased the indexing_buffer to 20%

thanks for any advice :slight_smile:

You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Hiya

I am working on a project which aims to store some dynamic data about
customers within Elastic search.
The idea is that we can keep a profile of customer actions and be able
to search easily on various attributes of the customers.
The customer profile will be updated based on messages coming into a
RabbitMQ queue.

So the approach we are currently taking is to modify the RabbitMQRiver
plugin and instead of doing bulk updates, performing upserts on the
customers based on their ID.

Rather than modifying the rabbitmq river, I suggest writing your own
rabbitmq listener, which then indexes/updates/does whatever into ES.

Having it as standalone is a good deal easier and more flexible than
using the river. Also you can parallelise it, while the river runs only
a single instance.

Also, use bulk apis as much as possible.

We have tried the following to boost the speed of the processing as
well:

  • increased default shards to 20

unless you have more nodes, that won't make any difference

And having more nodes will definitely increase throughput

clint

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

On Tuesday, February 26, 2013 8:56:28 PM UTC+11, Clinton Gormley wrote:

Hiya

I am working on a project which aims to store some dynamic data about
customers within Elastic search.
The idea is that we can keep a profile of customer actions and be able
to search easily on various attributes of the customers.
The customer profile will be updated based on messages coming into a
RabbitMQ queue.

So the approach we are currently taking is to modify the RabbitMQRiver
plugin and instead of doing bulk updates, performing upserts on the
customers based on their ID.

Rather than modifying the rabbitmq river, I suggest writing your own
rabbitmq listener, which then indexes/updates/does whatever into ES.

Having it as standalone is a good deal easier and more flexible than
using the river. Also you can parallelise it, while the river runs only
a single instance.

Also, use bulk apis as much as possible.

We have tried the following to boost the speed of the processing as
well:

  • increased default shards to 20

unless you have more nodes, that won't make any difference

And having more nodes will definitely increase throughput

clint

Thanks for the feedback on this. I just thought I'd post an update of where
I got to:

The reason I was looking at a river is that it's runs internally on an ES
node, and not externally, so it can connect directly with ES using the java
API instead of a worker process communicating over HTTP. I guess another
option would be to use Thrift, or run the worker on the ES nodes to
minimise network latency.

I have managed to modify the example rabbitMQ river to do upserts (only one
at a time though) instead of the bulk insert and this seems to work quite
well.
It is performing adequately at the moment, and I can also just run more
than one of these in parallel if I need to increase throughput - this seems
to increase performance quite well.

In future, if I find the rivers don't scale well, I'll look into the idea
of external worker processes doing a similar operation to the workers.
I also found (as most people suggest) that tuning the mapping and ES
settings have helped as well.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Just another follow up, since the 0.90 release I've modified the river to
perform bulk upserts, which improves performance more.
We can run several of these rivers in parallel and it's performing pretty
well, indexing over 1000 complex docs (with several nested docs) per second
on a 2 node cluster (fairly beefy machines).

On Wednesday, April 17, 2013 4:38:20 PM UTC+10, rockbobsta wrote:

On Tuesday, February 26, 2013 8:56:28 PM UTC+11, Clinton Gormley wrote:

Hiya

I am working on a project which aims to store some dynamic data about
customers within Elastic search.
The idea is that we can keep a profile of customer actions and be able
to search easily on various attributes of the customers.
The customer profile will be updated based on messages coming into a
RabbitMQ queue.

So the approach we are currently taking is to modify the RabbitMQRiver
plugin and instead of doing bulk updates, performing upserts on the
customers based on their ID.

Rather than modifying the rabbitmq river, I suggest writing your own
rabbitmq listener, which then indexes/updates/does whatever into ES.

Having it as standalone is a good deal easier and more flexible than
using the river. Also you can parallelise it, while the river runs only
a single instance.

Also, use bulk apis as much as possible.

We have tried the following to boost the speed of the processing as
well:

  • increased default shards to 20

unless you have more nodes, that won't make any difference

And having more nodes will definitely increase throughput

clint

Thanks for the feedback on this. I just thought I'd post an update of
where I got to:

The reason I was looking at a river is that it's runs internally on an ES
node, and not externally, so it can connect directly with ES using the java
API instead of a worker process communicating over HTTP. I guess another
option would be to use Thrift, or run the worker on the ES nodes to
minimise network latency.

I have managed to modify the example rabbitMQ river to do upserts (only
one at a time though) instead of the bulk insert and this seems to work
quite well.
It is performing adequately at the moment, and I can also just run more
than one of these in parallel if I need to increase throughput - this seems
to increase performance quite well.

In future, if I find the rivers don't scale well, I'll look into the idea
of external worker processes doing a similar operation to the workers.
I also found (as most people suggest) that tuning the mapping and ES
settings have helped as well.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.