Rabbitmq river stops working

Hi,

we are using elasticsearch for some time now and are very impressed how
good it works.
We are using ES in combination with rabbitmq and indexing around 1 million
documents a day over this rabbitmq-river into ES.
It works great! But from time to time (once a week) we get this "warning"
in the logifle:

[2012-04-09 12:19:15,724][WARN ][river.rabbitmq ] [Malekith the
Accursed] [rabbitmq][my_river8] failed to parse request for delivery tag
[2965504], ack'ing...
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at
org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:147)
at
org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:92)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.add(BulkRequestBuilder.java:81)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at java.lang.Thread.run(Thread.java:636)

And then the river stops to bulk insert documents into ES. We always have
to restart the river by deleting the river and making it new, some times
even under a different name, otherwise it will not connect to the rabbitmq
server again.

this are our river-settings:
{
"type":"rabbitmq",
"rabbitmq":{
"host":"HOST_IP",
"port":5672,
"user":"USERNAME",
"pass":"PASSWORD",
"vhost":"/",
"queue":"SOME_NAME",
"exchange":"SOME_NAME",
"routing_key":"SOME_NAME",
"exchange_type":"SOME_NAME",
"exchange_durable":true,
"queue_durable":true,
"queue_auto_delete":false
},
"index":{
"bulk_size":100,
"bulk_timeout":"10s",
"ordered":true
}
}

Any ideas why? And is there any chance to see what bulkinsert made this
error?
Thank you & keep up the great work!
Max

Any ideas why? And why is the river not restarting again?

Strange... . I can see that it acks the failed message (which happens when
the format of the bulk indexing message fails to parse). What I can see is
that in this case, a delivery tag will be ack'ed twice, which I am not
sure, once for the failure, and another later on, I can fix that, though I
am not sure why this will cause rabbitmq to stop sending messages...

On Mon, Apr 9, 2012 at 6:21 PM, Max Kossatz max.kossatz@gmail.com wrote:

Hi,

we are using elasticsearch for some time now and are very impressed how
good it works.
We are using ES in combination with rabbitmq and indexing around 1 million
documents a day over this rabbitmq-river into ES.
It works great! But from time to time (once a week) we get this "warning"
in the logifle:

[2012-04-09 12:19:15,724][WARN ][river.rabbitmq ] [Malekith the
Accursed] [rabbitmq][my_river8] failed to parse request for delivery tag
[2965504], ack'ing...
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at
org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:147)
at
org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:92)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.add(BulkRequestBuilder.java:81)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at java.lang.Thread.run(Thread.java:636)

And then the river stops to bulk insert documents into ES. We always have
to restart the river by deleting the river and making it new, some times
even under a different name, otherwise it will not connect to the rabbitmq
server again.

this are our river-settings:
{
"type":"rabbitmq",
"rabbitmq":{
"host":"HOST_IP",
"port":5672,
"user":"USERNAME",
"pass":"PASSWORD",
"vhost":"/",
"queue":"SOME_NAME",
"exchange":"SOME_NAME",
"routing_key":"SOME_NAME",
"exchange_type":"SOME_NAME",
"exchange_durable":true,
"queue_durable":true,
"queue_auto_delete":false
},
"index":{
"bulk_size":100,
"bulk_timeout":"10s",
"ordered":true
}
}

Any ideas why? And is there any chance to see what bulkinsert made this
error?
Thank you & keep up the great work!
Max

Any ideas why? And why is the river not restarting again?

Here is the issue:
https://github.com/elasticsearch/elasticsearch-river-rabbitmq/issues/9.

On Wed, Apr 11, 2012 at 12:39 PM, Shay Banon kimchy@gmail.com wrote:

Strange... . I can see that it acks the failed message (which happens when
the format of the bulk indexing message fails to parse). What I can see is
that in this case, a delivery tag will be ack'ed twice, which I am not
sure, once for the failure, and another later on, I can fix that, though I
am not sure why this will cause rabbitmq to stop sending messages...

On Mon, Apr 9, 2012 at 6:21 PM, Max Kossatz max.kossatz@gmail.com wrote:

Hi,

we are using elasticsearch for some time now and are very impressed how
good it works.
We are using ES in combination with rabbitmq and indexing around 1
million documents a day over this rabbitmq-river into ES.
It works great! But from time to time (once a week) we get this "warning"
in the logifle:

[2012-04-09 12:19:15,724][WARN ][river.rabbitmq ] [Malekith the
Accursed] [rabbitmq][my_river8] failed to parse request for delivery tag
[2965504], ack'ing...
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at
org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:147)
at
org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:92)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.add(BulkRequestBuilder.java:81)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at java.lang.Thread.run(Thread.java:636)

And then the river stops to bulk insert documents into ES. We always have
to restart the river by deleting the river and making it new, some times
even under a different name, otherwise it will not connect to the rabbitmq
server again.

this are our river-settings:
{
"type":"rabbitmq",
"rabbitmq":{
"host":"HOST_IP",
"port":5672,
"user":"USERNAME",
"pass":"PASSWORD",
"vhost":"/",
"queue":"SOME_NAME",
"exchange":"SOME_NAME",
"routing_key":"SOME_NAME",
"exchange_type":"SOME_NAME",
"exchange_durable":true,
"queue_durable":true,
"queue_auto_delete":false
},
"index":{
"bulk_size":100,
"bulk_timeout":"10s",
"ordered":true
}
}

Any ideas why? And is there any chance to see what bulkinsert made this
error?
Thank you & keep up the great work!
Max

Any ideas why? And why is the river not restarting again?

Great, thank you very much for the fast help!

Max

On Wednesday, 11 April 2012 11:42:05 UTC+2, kimchy wrote:

Here is the issue:
https://github.com/elasticsearch/elasticsearch-river-rabbitmq/issues/9.

On Wed, Apr 11, 2012 at 12:39 PM, Shay Banon kimchy@gmail.com wrote:

Strange... . I can see that it acks the failed message (which happens
when the format of the bulk indexing message fails to parse). What I can
see is that in this case, a delivery tag will be ack'ed twice, which I am
not sure, once for the failure, and another later on, I can fix that,
though I am not sure why this will cause rabbitmq to stop sending
messages...

On Mon, Apr 9, 2012 at 6:21 PM, Max Kossatz max.kossatz@gmail.comwrote:

Hi,

we are using elasticsearch for some time now and are very impressed how
good it works.
We are using ES in combination with rabbitmq and indexing around 1
million documents a day over this rabbitmq-river into ES.
It works great! But from time to time (once a week) we get this
"warning" in the logifle:

[2012-04-09 12:19:15,724][WARN ][river.rabbitmq ] [Malekith
the Accursed] [rabbitmq][my_river8] failed to parse request for delivery
tag [2965504], ack'ing...
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at
org.elasticsearch.common.xcontent.XContentFactory.xContent(XContentFactory.java:147)
at
org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:92)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.add(BulkRequestBuilder.java:81)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:240)
at java.lang.Thread.run(Thread.java:636)

And then the river stops to bulk insert documents into ES. We always
have to restart the river by deleting the river and making it new, some
times even under a different name, otherwise it will not connect to the
rabbitmq server again.

this are our river-settings:
{
"type":"rabbitmq",
"rabbitmq":{
"host":"HOST_IP",
"port":5672,
"user":"USERNAME",
"pass":"PASSWORD",
"vhost":"/",
"queue":"SOME_NAME",
"exchange":"SOME_NAME",
"routing_key":"SOME_NAME",
"exchange_type":"SOME_NAME",
"exchange_durable":true,
"queue_durable":true,
"queue_auto_delete":false
},
"index":{
"bulk_size":100,
"bulk_timeout":"10s",
"ordered":true
}
}

Any ideas why? And is there any chance to see what bulkinsert made this
error?
Thank you & keep up the great work!
Max

Any ideas why? And why is the river not restarting again?