Consume data from sharded rabbitmq queue #2


(Leandro de Souza) #1

I'm testing the rabbitmq-sharding plugin https://github.com/rabbitmq/rabbitmq-sharding

I created a test scenario based in this question Consume data from sharded RabbitMQ queue

In resume I have these instances
2 producers (logstash 6.6.3)
1 rabbitmq 3.6.11
2 consumers (logstash 6.6.3)

Pipeline of the producer 1
input {
heartbeat {
interval => 1
type => "heartbeat"
message => "%{@timestamp}"
}
}
output {
rabbitmq {
host => 'rabbitmq:5672'
exchange => 'events.sharded'
exchange_type => 'x-modulus-hash'
key => 'producer-1'
persistent => true
workers => 1
durable => true
password => '123456'
user => 'logstash-producer'
}
}

Pipeline of the producer 2
input {
heartbeat {
interval => 1
type => "heartbeat"
message => "%{@timestamp}"
}
}
output {
rabbitmq {
host => 'rabbitmq:5672'
exchange => 'events.sharded'
exchange_type => 'x-modulus-hash'
key => 'producer-2'
persistent => true
workers => 1
durable => true
password => '123456'
user => 'logstash-producer'
}
}

In the rabbitmq I added a policy
rabbitmqctl set_policy events-shard "^events.sharded$" '{"shards-per-node": 2}'

So, when I added an exchange of type x-modulus-hash, the rabbitmq create 2 queues


Pipeline for the consumer 1 and consumer 2
input {
rabbitmq {
host => 'rabbitmq:5672'
queue => 'events.sharded'
passive => true
durable => true
prefetch_count => 2
password => '123456'
user => 'logstash-consumer'
threads => 10
}
}
output {
stdout {}
}

When I start the producers the behavior is ok, there are messages in both queues.


But, when I start the consumers, they only connect to the first queue.

After start the first consumer




After start the second consumer




Any suggestion to do the consumers connect in both queues?


(Ry Biesemeyer) #2

I am not an expert on RabbitMQ, but some of the caveats that are called out on the Sharding Plugin's docs are disconcerting:

How does it work? The plugin will chose the queue from the shard with the least amount of consumers , provided the queue contents are local to the broker you are connected to.

NOTE: there's a small race condition between RabbitMQ updating the queue's internal stats about consumers and when clients issue basic.consume commands. The problem with this is that if your client issue many basic.consume commands without too much time in between, it might happen that the plugin assigns the consumers to queues in an uneven way.

-- RabbitMQ Sharding Plugin

Essentially, the plugin does not guarantee that 100% of your pseudo-queues will be consumed, even if you create a sufficient quantity of consumers.


(Leandro de Souza) #3

Hey Biesemeyer thanks for answering.
I read this comment of the rabbitmq plugin and I made other simple test.

I started only one logstash, so, in the rabbitmq management, I can see 10 consumers in the first queue.

I waited some minutes and I started the second consumer. But it continues to connect only in the first queue.

So, I made another test with nodejs consumers.

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://nodejs-consumer:1234@rabbitmq:5672', function (err, conn) {
    
    if (err) {
        console.error(err);
        return;
    }
    
    conn.createChannel(function (err, ch) {
        var q = 'events.sharded';
        ch.assertQueue(q, { durable: true });
        ch.prefetch(1);
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        ch.consume(q, function (msg) {
            console.log(" [x] Received %s", msg.content.toString());
        }, { noAck: true });
    });
});

In this case, I can see the consumers distributed for each queue.

I tried to understand if there anything to settings in the logstash input-plugin, but I didn't find.
I only found this open issue in the plugin repository - https://github.com/logstash-plugins/logstash-input-rabbitmq/issues/29


(Ry Biesemeyer) #4

Again, this plugin is a RabbitMQ-internal thing, that is not supposed to expose itself in any way to consumers. It does not add any options to the APIs that the consumers use to connect. I don't know why it is failing to route consumers, but it is outside of the control of the Logstash RabbitMQ Input.

The caveats in the documentation about the RabbitMQ Sharding plugin indicating that it does not guarantee that consumers will be routed to all queues would be enough for me to not use it in any production environment.


(Leandro de Souza) #5

Hey Biesemeyer thanks again for answering.

I'm testing this rabbitmq plugin because I need to share the same type of messages between some queues. The idea is to increase the throughput and use more then one CPU for each rabbitmq node.

I did other test, I created rabbitmq's queues manually.
The problem is to setup a logstash pipeline for each queue, I need to specify only one queue name in the input settings.

Do you have any suggestion to minimize this?


(Leandro de Souza) #6

I did another test to minimize the effort to keep some many pipelines.

I added more then one entry for rabbitmq plugin in the same pipeline.
input {
rabbitmq {
host => 'rabbitmq:5672'
queue => 'events.sharded.1'
passive => true
durable => true
password => '123456'
user => 'logstash-consumer'
threads => 10
}
rabbitmq {
host => 'rabbitmq:5672'
queue => 'events.sharded.2'
passive => true
durable => true
password => '123456'
user => 'logstash-consumer'
threads => 10
}
rabbitmq {
host => 'rabbitmq:5672'
queue => 'events.sharded.n'
passive => true
durable => true
password => '123456'
user => 'logstash-consumer'
threads => 10
}
}
output {
stdout {}
}

Why do you think about it?