I have a rabbitmq stream whose events I want logstash to process.
I can get it working mostly ok but logstash only processes records that are pushed to the stream while logstash is running.
If I push messages before logstash is started, or while its down for maintenance, those messages are lost.
My question is whether I can configure the rabbitmq input plugin to also start from the beginning of the stream?
My logstash is running under image: docker.elastic.co/logstash/logstash:7.17.0
The input is defined as:
input {
rabbitmq {
id => "rabbitmq_1"
host => "load-balancer"
port => 5672
vhost => "/"
queue => "AllEvents"
host => "localhost"
exchange => "central_header_exchange"
codec => "json"
durable => true
user => "guest"
password => "guest"
arguments => {
"x-initial-cluster-size" => 3
"x-max-age" => "7D"
"x-queue-type" => "stream"
"x-ha-policy" => "all"
"x-stream-offset" => "first"
}
metadata_enabled => "extended"
}
}
According to this doc setting the "x-stream-offset" => "first"
argument on the consumer should make this work, but I'm not sure if where I've put it is the right place in the plugin config.
Specifically I 'm trying to figure out how to configure the rabbitmq
input plugin to pass consumer_arguments
down to the underlying march hare code, e.g.
with Connection('amqp://test:test@localhost:5672//') as conn:
with Consumer(conn, queues=[Queue('my-test',
exchange=Exchange('predictions', type='fanout', durable=False), durable=True,
queue_arguments={'x-queue-type':'stream'},
consumer_arguments={'x-stream-offset': 0})],
prefetch_count=1000, callbacks=[callback]):
Thanks