Getting exceptions trying to consume messages produced by logstash from rabbitmq


(arimus) #1

Any time I add a message to the queue that elasticsearch is subscribed to
(the one in the README), I get this exception in the elasticsearch logs.
These messages are being pushing to rabbitmq via the logstash 1.1.0 amqp
output plugin.

Am I correct in assuming that this is okay even though logstash is talking
AMQP 0.8 and the river plugin is connected via AMQP 0.9.1? Is there perhaps
a problem with the message content?

FYI, I also tried to roll back to elasticsearch 0.18.7 and got the same
result. And i've upgraded to 0.19.2 to try that as well. Probably something
I am doing that is silly, but I can't tell what. Any ideas?

[2012-04-04 19:04:05,777][WARN ][river.rabbitmq ] [Astron]
[rabbitmq][my_river] failed to execute bulk for delivery tags [[20]], not
ack'ing
org.elasticsearch.action.ActionRequestValidationException: Validation
Failed: 1: no requests added;
at
org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:29)
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:259)
at
org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:55)
at org.elasticsearch.client.node.NodeClient.execute(NodeClient.java:83)
at
org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:141)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.doExecute(BulkRequestBuilder.java:128)
at
org.elasticsearch.action.support.BaseRequestBuilder.execute(BaseRequestBuilder.java:53)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.java:283)
at java.lang.Thread.run(Thread.java:679)

Issue created as well, but can remove if this is purely user error. In any
case, the exception hasn't led me to an answer yet.

https://github.com/elasticsearch/elasticsearch-river-rabbitmq/issues/8


(arimus) #2

Fyi, I also upgraded logstash to using AMQP 0.9.1 just in case, with
the same result. So I assume that it's a data format problem of some
format? Here is the data from the queue:

Exchange elasticsearch
Routing Key elasticsearch
Redelivered ●
Properties
priority: 0
delivery_mode: 2
content_type: application/octet-stream
Payload
320 bytes
Encoding: string
{"@source":"tcp://127.0.0.1:6999/client/
127.0.0.1:52724","@type":"linux-syslog","@tags":[],"@fields":
{},"@timestamp":"2012-04-03T17:50:35.893000Z","@source_host":"127.0.0.1","@source_path":"/
client/127.0.0.1:52724","@message":"Apr 3 03:19:17 localhost sm-msp-
queue[1136]: starting daemon (8.14.5): queueing@01:00:00\n"}

On Apr 4, 7:20 pm, arimus ari...@gmail.com wrote:

Any time I add a message to the queue that elasticsearch is subscribed to
(the one in the README), I get this exception in the elasticsearch logs.
These messages are being pushing to rabbitmq via the logstash 1.1.0 amqp
output plugin.

Am I correct in assuming that this is okay even though logstash is talking
AMQP 0.8 and the river plugin is connected via AMQP 0.9.1? Is there perhaps
a problem with the message content?

FYI, I also tried to roll back to elasticsearch 0.18.7 and got the same
result. And i've upgraded to 0.19.2 to try that as well. Probably something
I am doing that is silly, but I can't tell what. Any ideas?

[2012-04-04 19:04:05,777][WARN ][river.rabbitmq ] [Astron]
[rabbitmq][my_river] failed to execute bulk for delivery tags [[20]], not
ack'ing
org.elasticsearch.action.ActionRequestValidationException: Validation
Failed: 1: no requests added;
at
org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions .java:29)
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:259)
at
org.elasticsearch.action.support.TransportAction.execute(TransportAction.ja va:55)
at org.elasticsearch.client.node.NodeClient.execute(NodeClient.java:83)
at
org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:14 1)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.doExecute(BulkRequestBuild er.java:128)
at
org.elasticsearch.action.support.BaseRequestBuilder.execute(BaseRequestBuil der.java:53)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.j ava:283)
at java.lang.Thread.run(Thread.java:679)

Issue created as well, but can remove if this is purely user error. In any
case, the exception hasn't led me to an answer yet.

https://github.com/elasticsearch/elasticsearch-river-rabbitmq/issues/8


(arimus) #3

Alright, it was indeed a simple fix. In all my configuration changes,
I landed on the amqp output plugin for logstash instead of the
elasticsearch_river plugin =P In any case, there was still a
necessary patch needed to get things working. The current logstash
river module is broken, although there is a thread discussing some
fixes from a couple month ago. In any case this will get you up and
running!

diff --git a/lib/logstash/outputs/elasticsearch_river.rb b/lib/
logstash/outputs/elasticsearch_river.rb
index f63619a..b34e91a 100644
--- a/lib/logstash/outputs/elasticsearch_river.rb
+++ b/lib/logstash/outputs/elasticsearch_river.rb
@@ -99,7 +99,6 @@ class LogStash::Outputs::ElasticSearchRiver <
LogStash::Outputs::Base
"user" => [@user],
"password" => [@password],
"exchange_type" => [@exchange_type],

  •  "queue_name" => [@name],
     "name" => [@exchange],
     "key" => [@key],
     "vhost" => [@vhost],
    

And for good measure if you want to be running with AMQP 0.9.1 and the
latest elasticsearch (will be submitting these back to logstash):

diff --git a/Makefile b/Makefile
index 1259654..7879012 100644
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@

wget

JRUBY_VERSION=1.6.7
-ELASTICSEARCH_VERSION=0.18.7
+ELASTICSEARCH_VERSION=0.19.2
JODA_VERSION=2.1
VERSION=$(shell ruby -r./lib/logstash/version -e 'puts
LOGSTASH_VERSION')

diff --git a/lib/logstash/outputs/amqp.rb b/lib/logstash/outputs/
amqp.rb
index a33c628..ef80901 100644
--- a/lib/logstash/outputs/amqp.rb
+++ b/lib/logstash/outputs/amqp.rb
@@ -54,6 +54,9 @@ class LogStash::Outputs::Amqp <
LogStash::Outputs::Base

Validate SSL certificate

config :verify_ssl, :validate => :boolean, :default => false

  • AMQP spec

  • config :spec, :validate => :string, :default => "09"
  • public
    def register
    require "bunny" # rubygem 'bunny'
    @@ -69,6 +72,7 @@ class LogStash::Outputs::Amqp <
    LogStash::Outputs::Base
    :host => @host,
    :port => @port,
    :logging => @debug,
  •  :spec => @spec,
    
    }
    amqpsettings[:user] = @user if @user
    amqpsettings[:pass] = @password.value if @password
    On Apr 5, 12:28 pm, arimus ari...@gmail.com wrote:

Fyi, I also upgraded logstash to using AMQP 0.9.1 just in case, with
the same result. So I assume that it's a data format problem of some
format? Here is the data from the queue:

Exchange elasticsearch
Routing Key elasticsearch
Redelivered ●
Properties
priority: 0
delivery_mode: 2
content_type: application/octet-stream
Payload
320 bytes
Encoding: string
{"@source":"tcp://127.0.0.1:6999/client/
127.0.0.1:52724","@type":"linux-syslog","@tags":[],"@fields":
{},"@timestamp":"2012-04-03T17:50:35.893000Z","@source_host":"127.0.0.1","@ source_path":"/
client/127.0.0.1:52724","@message":"Apr 3 03:19:17 localhost sm-msp-
queue[1136]: starting daemon (8.14.5): queueing@01:00:00\n"}

On Apr 4, 7:20 pm, arimus ari...@gmail.com wrote:

Any time I add a message to the queue that elasticsearch is subscribed to
(the one in the README), I get this exception in the elasticsearch logs.
These messages are being pushing to rabbitmq via the logstash 1.1.0 amqp
output plugin.

Am I correct in assuming that this is okay even though logstash is talking
AMQP 0.8 and the river plugin is connected via AMQP 0.9.1? Is there perhaps
a problem with the message content?

FYI, I also tried to roll back to elasticsearch 0.18.7 and got the same
result. And i've upgraded to 0.19.2 to try that as well. Probably something
I am doing that is silly, but I can't tell what. Any ideas?

[2012-04-04 19:04:05,777][WARN ][river.rabbitmq ] [Astron]
[rabbitmq][my_river] failed to execute bulk for delivery tags [[20]], not
ack'ing
org.elasticsearch.action.ActionRequestValidationException: Validation
Failed: 1: no requests added;
at
org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions .java:29)
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:259)
at
org.elasticsearch.action.support.TransportAction.execute(TransportAction.ja va:55)
at org.elasticsearch.client.node.NodeClient.execute(NodeClient.java:83)
at
org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:14 1)
at
org.elasticsearch.action.bulk.BulkRequestBuilder.doExecute(BulkRequestBuild er.java:128)
at
org.elasticsearch.action.support.BaseRequestBuilder.execute(BaseRequestBuil der.java:53)
at
org.elasticsearch.river.rabbitmq.RabbitmqRiver$Consumer.run(RabbitmqRiver.j ava:283)
at java.lang.Thread.run(Thread.java:679)

Issue created as well, but can remove if this is purely user error. In any
case, the exception hasn't led me to an answer yet.

https://github.com/elasticsearch/elasticsearch-river-rabbitmq/issues/8


(system) #4