Rabbitmq river stops working


(Max Kossatz) #1

Hi,

we are using elasticsearch for some time now and are very impressed how
good it works.
We are using ES in combination with rabbitmq and indexing around 1 million
documents a day over this rabbitmq-river into ES.
It works great! But from time to time (once a week) we get this "warning"
in the logifle:

[2012-04-09 12:19:15,724][WARN ][river.rabbitmq ] [Malekith the
Accursed] [rabbitmq][my_river8] failed to parse request for delivery tag
[2965504], ack'ing...
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at
org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:147)
at
org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:92)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.add(BulkRequestBuilder.java:81)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at java.lang.Thread.run(Thread.java:636)

And then the river stops to bulk insert documents into ES. We always have
to restart the river by deleting the river and making it new, some times
even under a different name, otherwise it will not connect to the rabbitmq
server again.

this are our river-settings:
{
"type":"rabbitmq",
"rabbitmq":{
"host":"HOST_IP",
"port":5672,
"user":"USERNAME",
"pass":"PASSWORD",
"vhost":"/",
"queue":"SOME_NAME",
"exchange":"SOME_NAME",
"routing_key":"SOME_NAME",
"exchange_type":"SOME_NAME",
"exchange_durable":true,
"queue_durable":true,
"queue_auto_delete":false
},
"index":{
"bulk_size":100,
"bulk_timeout":"10s",
"ordered":true
}
}

Any ideas why? And is there any chance to see what bulkinsert made this
error?
Thank you & keep up the great work!
Max

Any ideas why? And why is the river not restarting again?


(Shay Banon) #2

Strange... . I can see that it acks the failed message (which happens when
the format of the bulk indexing message fails to parse). What I can see is
that in this case, a delivery tag will be ack'ed twice, which I am not
sure, once for the failure, and another later on, I can fix that, though I
am not sure why this will cause rabbitmq to stop sending messages...

On Mon, Apr 9, 2012 at 6:21 PM, Max Kossatz max.kossatz@gmail.com wrote:

Hi,

we are using elasticsearch for some time now and are very impressed how
good it works.
We are using ES in combination with rabbitmq and indexing around 1 million
documents a day over this rabbitmq-river into ES.
It works great! But from time to time (once a week) we get this "warning"
in the logifle:

[2012-04-09 12:19:15,724][WARN ][river.rabbitmq ] [Malekith the
Accursed] [rabbitmq][my_river8] failed to parse request for delivery tag
[2965504], ack'ing...
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at
org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:147)
at
org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:92)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.add(BulkRequestBuilder.java:81)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at java.lang.Thread.run(Thread.java:636)

And then the river stops to bulk insert documents into ES. We always have
to restart the river by deleting the river and making it new, some times
even under a different name, otherwise it will not connect to the rabbitmq
server again.

this are our river-settings:
{
"type":"rabbitmq",
"rabbitmq":{
"host":"HOST_IP",
"port":5672,
"user":"USERNAME",
"pass":"PASSWORD",
"vhost":"/",
"queue":"SOME_NAME",
"exchange":"SOME_NAME",
"routing_key":"SOME_NAME",
"exchange_type":"SOME_NAME",
"exchange_durable":true,
"queue_durable":true,
"queue_auto_delete":false
},
"index":{
"bulk_size":100,
"bulk_timeout":"10s",
"ordered":true
}
}

Any ideas why? And is there any chance to see what bulkinsert made this
error?
Thank you & keep up the great work!
Max

Any ideas why? And why is the river not restarting again?


(Shay Banon) #3

Here is the issue:
https://github.com/elasticsearch/elasticsearch-river-rabbitmq/issues/9.

On Wed, Apr 11, 2012 at 12:39 PM, Shay Banon kimchy@gmail.com wrote:

Strange... . I can see that it acks the failed message (which happens when
the format of the bulk indexing message fails to parse). What I can see is
that in this case, a delivery tag will be ack'ed twice, which I am not
sure, once for the failure, and another later on, I can fix that, though I
am not sure why this will cause rabbitmq to stop sending messages...

On Mon, Apr 9, 2012 at 6:21 PM, Max Kossatz max.kossatz@gmail.com wrote:

Hi,

we are using elasticsearch for some time now and are very impressed how
good it works.
We are using ES in combination with rabbitmq and indexing around 1
million documents a day over this rabbitmq-river into ES.
It works great! But from time to time (once a week) we get this "warning"
in the logifle:

[2012-04-09 12:19:15,724][WARN ][river.rabbitmq ] [Malekith the
Accursed] [rabbitmq][my_river8] failed to parse request for delivery tag
[2965504], ack'ing...
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at
org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:147)
at
org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:92)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.add(BulkRequestBuilder.java:81)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at java.lang.Thread.run(Thread.java:636)

And then the river stops to bulk insert documents into ES. We always have
to restart the river by deleting the river and making it new, some times
even under a different name, otherwise it will not connect to the rabbitmq
server again.

this are our river-settings:
{
"type":"rabbitmq",
"rabbitmq":{
"host":"HOST_IP",
"port":5672,
"user":"USERNAME",
"pass":"PASSWORD",
"vhost":"/",
"queue":"SOME_NAME",
"exchange":"SOME_NAME",
"routing_key":"SOME_NAME",
"exchange_type":"SOME_NAME",
"exchange_durable":true,
"queue_durable":true,
"queue_auto_delete":false
},
"index":{
"bulk_size":100,
"bulk_timeout":"10s",
"ordered":true
}
}

Any ideas why? And is there any chance to see what bulkinsert made this
error?
Thank you & keep up the great work!
Max

Any ideas why? And why is the river not restarting again?


(Max Kossatz) #4

Great, thank you very much for the fast help!

Max

On Wednesday, 11 April 2012 11:42:05 UTC+2, kimchy wrote:

Here is the issue:
https://github.com/elasticsearch/elasticsearch-river-rabbitmq/issues/9.

On Wed, Apr 11, 2012 at 12:39 PM, Shay Banon kimchy@gmail.com wrote:

Strange... . I can see that it acks the failed message (which happens
when the format of the bulk indexing message fails to parse). What I can
see is that in this case, a delivery tag will be ack'ed twice, which I am
not sure, once for the failure, and another later on, I can fix that,
though I am not sure why this will cause rabbitmq to stop sending
messages...

On Mon, Apr 9, 2012 at 6:21 PM, Max Kossatz max.kossatz@gmail.comwrote:

Hi,

we are using elasticsearch for some time now and are very impressed how
good it works.
We are using ES in combination with rabbitmq and indexing around 1
million documents a day over this rabbitmq-river into ES.
It works great! But from time to time (once a week) we get this
"warning" in the logifle:

[2012-04-09 12:19:15,724][WARN ][river.rabbitmq ] [Malekith
the Accursed] [rabbitmq][my_river8] failed to parse request for delivery
tag [2965504], ack'ing...
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at
org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:147)
at
org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:92)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.add(BulkRequestBuilder.java:81)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at java.lang.Thread.run(Thread.java:636)

And then the river stops to bulk insert documents into ES. We always
have to restart the river by deleting the river and making it new, some
times even under a different name, otherwise it will not connect to the
rabbitmq server again.

this are our river-settings:
{
"type":"rabbitmq",
"rabbitmq":{
"host":"HOST_IP",
"port":5672,
"user":"USERNAME",
"pass":"PASSWORD",
"vhost":"/",
"queue":"SOME_NAME",
"exchange":"SOME_NAME",
"routing_key":"SOME_NAME",
"exchange_type":"SOME_NAME",
"exchange_durable":true,
"queue_durable":true,
"queue_auto_delete":false
},
"index":{
"bulk_size":100,
"bulk_timeout":"10s",
"ordered":true
}
}

Any ideas why? And is there any chance to see what bulkinsert made this
error?
Thank you & keep up the great work!
Max

Any ideas why? And why is the river not restarting again?


(system) #5