Consume data from sharded RabbitMQ queue



I would like to set up a Logstash instance to consume data from a sharded RabbitMQ queue.

I am using RabbitMQ 3.3.5 and Logstash 5.2 right now. I installed the rabbitmq-sharding plugin ( as it is not included in this version.

For RabbitMQ I set the policy to define the sharding on my test exchange:
rabbitmqctl set_policy shard_test_exchange-shard "^shard_test_exchange$" '{"shards-per-node": 7, "routing-key": "logstash"}' --apply-to exchanges

I loaded the data into the queue using the following Logstash config:
/usr/share/logstash/bin/logstash -e "input { file { start_position => 'beginning' path => '/home/myuser/testinput.txt' codec => 'plain' sincedb_path => '/opt/myapp/rabbitmqsharding' } } output{ rabbitmq { host => 'myhostname' exchange => 'shard_test_exchange' exchange_type => 'x-modulus-hash' key => '%{@timestamp}' persistent => true workers => 1 durable => true password => 'admin_password' user => 'admin_user' } }"

The exchange was created and I could see the sharded queue in RabbitMQ.

Then I tried to run another Logstash instance and set up its input plugin to consume data from this sharded queue:
/usr/share/logstash/bin/logstash -e "input { rabbitmq { host => 'myhostname' queue => 'shard_test_exchange' passive => true prefetch_count => 4000 password => 'admin_password' user => 'admin_user' debug =>true } } output{ stdout{} }"

(The rabbitmq documentation sais that the queue name should be the name of the exchange.)

I always get the following error message:
13:22:42.160 [[main]<rabbitmq] WARN logstash.inputs.rabbitmq - Error while setting up connection for rabbitmq input! Will retry. {:message=>"#method<connection.close>(reply-code=541, reply-text=INTERNAL_ERROR - Interceptor: rabbit_sharding_interceptor failed with reason: \"Can't declare sharded queue: <<\\\"shard_test_exchange\\\">>\", class-id=50, method-id=10)", :class=>"MarchHare::ChannelAlreadyClosed", :location=>"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/march_hare-2.22.0-java/lib/march_hare/exceptions.rb:121:inconvert_and_reraise'"}`

It seems that the logstash rabbitmq input plugin tries to define the queue, but it can't, because it already exists. How can I configure the logstash to connect to the sharded queue?


(Ry Biesemeyer) #2

I believe that your input should mirror some of the settings from the output, namely exchange and exchange_type; I believe that the queue directive is a bit of a misnomer, in that it only names the local queue and is optional.


Hi @yaauie,

Thank you for the idea, now I tried to add the exchange => 'shard_test_exchange' exchange_type => 'x-modulus-hash' parameters, but the results are the same. (I tried to delete the queue, and the passive too, but no luck.)

It seems that the plugin tries to declare the queue (in active or passive mode) and somehow it doesn't compatible with sharded queues, can it be avoided somehow? In this case during the creation of the exchage the queue is created too, so when the input plugin (consumer) starts, it can use the existing queue.

Due to performance issues I need to use sharding in rabbitmq, because it can't handle the velocity of incoming/outgoing data on 1 CPU (as rabbitmq uses 1 CPU per queue). I use multiple threads in the input plugin (consumers to get data from the queue), but without sharding it doesn't help me much. I don't think it is a unique problem, I hope that I just miss something in the paramterization.

Do you have any idea, how can I solve this issue, and connect to the queue?

(Ry Biesemeyer) #4

Unfortunately, I do not know much about RabbitMQ, but the documentation for the plugin seems to indicate that the sharding is an internal feature, and that consumers attach to the exchange in the normal way with a notable exception: you need at least as many consumers as shards, or else you risk messages from one partition falling behind.

There is a bold note in the docs about not declaring the same queue name in your consumer as in your producer as well:


This is exacltly what I found, and thats why I thought that the logstash input plugin should handle it without any issues (if in the queue parameter I give it the name of the exchange). However it seems that the rabbitmq input plugin tries to create the queue in the setup! (in source code). The only option what I found was the passive, but it doesn't change anything.

Do you know how can I modify the parameters, is there any way to avoid the queue creation? Or should I create a custom modified plugin to handle that case (ie. delete the call of declare_queue! from the code)?

(Ry Biesemeyer) #6

I would at least open up an issue on the project indicating that it appears that its use of the RabbitMQ APIs is preventing it from working with queues that are sharded.

If you can also hack on the project to figure out what bit is interfering, I'm sure our Logstash Integrations team will be able to assist in getting a pull-request into shape that solves your use-case without breaking everyone else's. That way, you won't have to maintain a diverged fork of the project or have the overhead of installing custom plugins on your production machines.

(Thomas Davis) #7

This works with logstash6.x. I just got it working.

create the exchange, set the policy.

For every queue that is created, create the following logstash entry:

input { rabbitmq { host => "rabbit" port => 5672 exchange => "sharded-exchange" queue => "sharding: sharded-exchange - rabbit@rabbit-1 - 0" passive => true durable => true user => "logstash" password => "password" codec => "json" } }

You change the 'queue' name to match exactly the sharded queue you want to pull from.

If you have 2 rabbitmq hosts, and you've sharded 4 times, you need to repeat this 4 times, with each rabbit host & queue shard # in the queue name.

(system) #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.