ES with RabbitMQ river and message lost


(Guozhu Wei) #1

I use ES with RabbitMQ river to index about 120,000 docs. I write a python
client to publish the docs to RabbitMQ in a speed ~300 docs per second and
make the messages durable.

If the ES is running, the indexing process seems smoothly and no message is
lost.

I want to test whether the messages will be lost in a higher publish rate.
Below is how I tested:

  1. Stopped the es, but continues to write all 120,000 messages to the queue;
  2. When all the messages are sent to the queue, start the es again;

In this situation, the publish rate is about 5,000 docs per seconds, and
~70,000 messages are lost. I found exceptions in my log file:

message [EsRejectedExecutionException[rejected execution (queue capacity
50) on org.elasticsearch.action.support.replication.TransportShardReplica
tionOperationAction$AsyncShardOperationAction$1@67a0e65]]

I think that may be due to the huge publish rate. So I change some
parameters(num_prefetch and num_consumers, which is new in nightly build)
refer to
https://github.com/elasticsearch/elasticsearch-river-rabbitmq/pull/11 and
test again. But still 50,000 messages lost. I also changed the bulk_size
and bulk_timeout, and it doesn't work too.

Any suggestion to solve this problem? Thanks!

Below is the river config:

PUT /_river/my_river/_meta
{
"type": "rabbitmq",
"rabbitmq": {
"host": "192.168.18.182",
"port": 5672,
"user": "guest",
"pass": "guest",
"vhost": "/",
"queue": "elasticsearch",
"exchange": "elasticsearch",
"routing_key": "elasticsearch",
"exchange_type": "direct",
"exchange_durable": true,
"queue_durable": true,
"queue_auto_delete": false,
"num_prefetch": 200,
"num_consumers": 1
},
"index": {
"bulk_size": 100,
"bulk_timeout": "10s",
"ordered": false
}
}

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Matt Michalowski) #2

On Friday, November 15, 2013 4:00:11 PM UTC+11, weig...@gmail.com wrote:

I use ES with RabbitMQ river to index about 120,000 docs. I write a python
client to publish the docs to RabbitMQ in a speed ~300 docs per second and
make the messages durable.

If the ES is running, the indexing process seems smoothly and no message
is lost.

I want to test whether the messages will be lost in a higher publish rate.
Below is how I tested:

  1. Stopped the es, but continues to write all 120,000 messages to the
    queue;
  2. When all the messages are sent to the queue, start the es again;

In this situation, the publish rate is about 5,000 docs per seconds, and
~70,000 messages are lost. I found exceptions in my log file:

message [EsRejectedExecutionException[rejected execution (queue capacity
50) on org.elasticsearch.action.support.replication.TransportShardReplica
tionOperationAction$AsyncShardOperationAction$1@67a0e65]]

I think that may be due to the huge publish rate. So I change some
parameters(num_prefetch and num_consumers, which is new in nightly build)
refer to
https://github.com/elasticsearch/elasticsearch-river-rabbitmq/pull/11 and
test again. But still 50,000 messages lost. I also changed the bulk_size
and bulk_timeout, and it doesn't work too.

Any suggestion to solve this problem? Thanks!

Below is the river config:

PUT /_river/my_river/_meta
{
"type": "rabbitmq",
"rabbitmq": {
"host": "192.168.18.182",
"port": 5672,
"user": "guest",
"pass": "guest",
"vhost": "/",
"queue": "elasticsearch",
"exchange": "elasticsearch",
"routing_key": "elasticsearch",
"exchange_type": "direct",
"exchange_durable": true,
"queue_durable": true,
"queue_auto_delete": false,
"num_prefetch": 200,
"num_consumers": 1
},
"index": {
"bulk_size": 100,
"bulk_timeout": "10s",
"ordered": false
}
}

I think you need to increase your 'threadpool.bulk.queue_size' (and
possibly 'threadpool.index.queue_size') setting due to recent defaults.

See:

Matt.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Guozhu Wei) #3

Thanks!

I changed the code to test the Qos and build the plugin, now the Qos works
and nothing lost now. And I think change the thread pool size as you said
will also take effect.

private class Consumer implements Runnable {

    private Connection connection;

    private Channel channel;

    @Override
    public void run() {
        while (true) {
            if (closed) {
                break;
            }
            try {
                connection = 

connectionFactory.newConnection(rabbitAddresses);
channel = connection.createChannel();
channel.basicQos(200);

I didn’t dig into the code, but I think if the indexer raise an exception,
the river should not simply ack the message.

Thank you for your response!

Gary.

在 2013年11月15日星期五UTC+8下午2时25分47秒,Matt Michalowski写道:

On Friday, November 15, 2013 4:00:11 PM UTC+11, weig...@gmail.com wrote:

I use ES with RabbitMQ river to index about 120,000 docs. I write a
python client to publish the docs to RabbitMQ in a speed ~300 docs per
second and make the messages durable.

If the ES is running, the indexing process seems smoothly and no message
is lost.

I want to test whether the messages will be lost in a higher publish
rate. Below is how I tested:

  1. Stopped the es, but continues to write all 120,000 messages to the
    queue;
  2. When all the messages are sent to the queue, start the es again;

In this situation, the publish rate is about 5,000 docs per seconds, and
~70,000 messages are lost. I found exceptions in my log file:

message [EsRejectedExecutionException[rejected execution (queue capacity
50) on org.elasticsearch.action.support.replication.TransportShardReplica
tionOperationAction$AsyncShardOperationAction$1@67a0e65]]

I think that may be due to the huge publish rate. So I change some
parameters(num_prefetch and num_consumers, which is new in nightly build)
refer to
https://github.com/elasticsearch/elasticsearch-river-rabbitmq/pull/11 and
test again. But still 50,000 messages lost. I also changed the bulk_size
and bulk_timeout, and it doesn't work too.

Any suggestion to solve this problem? Thanks!

Below is the river config:

PUT /_river/my_river/_meta
{
"type": "rabbitmq",
"rabbitmq": {
"host": "192.168.18.182",
"port": 5672,
"user": "guest",
"pass": "guest",
"vhost": "/",
"queue": "elasticsearch",
"exchange": "elasticsearch",
"routing_key": "elasticsearch",
"exchange_type": "direct",
"exchange_durable": true,
"queue_durable": true,
"queue_auto_delete": false,
"num_prefetch": 200,
"num_consumers": 1
},
"index": {
"bulk_size": 100,
"bulk_timeout": "10s",
"ordered": false
}
}

I think you need to increase your 'threadpool.bulk.queue_size' (and
possibly 'threadpool.index.queue_size') setting due to recent defaults.

See:
https://github.com/elasticsearch/elasticsearch/issues/3888

Matt.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(system) #4