Can the logstash rabbitmq input plugin read from the start of a stream?

I have a rabbitmq stream whose events I want logstash to process.

I can get it working mostly ok but logstash only processes records that are pushed to the stream while logstash is running.

If I push messages before logstash is started, or while its down for maintenance, those messages are lost.

My question is whether I can configure the rabbitmq input plugin to also start from the beginning of the stream?

My logstash is running under image: docker.elastic.co/logstash/logstash:7.17.0

The input is defined as:

input {
	rabbitmq {
		id => "rabbitmq_1"
		host => "load-balancer"
		port => 5672
		vhost => "/"
		queue => "AllEvents"
		host => "localhost"
		exchange => "central_header_exchange"
		codec => "json"
		durable => true
		user => "guest"
        password => "guest"
		arguments => {
			"x-initial-cluster-size" =>	3
			"x-max-age" => "7D"
			"x-queue-type" => "stream"
			"x-ha-policy" => "all"
			"x-stream-offset" => "first"
		}
		metadata_enabled => "extended"
	}
}

According to this doc setting the "x-stream-offset" => "first" argument on the consumer should make this work, but I'm not sure if where I've put it is the right place in the plugin config.

Specifically I 'm trying to figure out how to configure the rabbitmq input plugin to pass consumer_arguments down to the underlying march hare code, e.g.

with Connection('amqp://test:test@localhost:5672//') as conn:
    with Consumer(conn, queues=[Queue('my-test', 
        exchange=Exchange('predictions', type='fanout', durable=False), durable=True, 
        queue_arguments={'x-queue-type':'stream'}, 
        consumer_arguments={'x-stream-offset': 0})], 
        prefetch_count=1000, callbacks=[callback]):

Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.