RabbitMQ output configuration - alternate-exchange - help needed

Hi,
I'm trying out RabbitMQ (instead of Redis), and am typically creating the exchanges, bindings, queues in advance, then configuring logstash to match the existing config.

There is one configuration I just tried now which is the option for an exchange to have an alternate-exchange configured.

I do not know how to express that configuration in the logstash output plugin, and without it he generates an error:

:message=>"Pipeline aborted due to error", :exception=>#<MarchHare::PreconditionFailed: PRECONDITION_FAILED - inequivalent arg 'alternate-exchange' for exchange 'logstash' in vhost '/': received none but current is the value 'logstash-ae' of type 'longstr'>

My latest config attempt (still fails, previous ones didn't include any reference to the alternate-exchange):

  rabbitmq {
     exchange => "logstash"
     exchange_type => "direct"
     arguments => {"alternate-exchange" => "logstash-ae"}
     host => "localhost"
     port => 5672
     key => "logstash"
  }

Logstash Version: 2.3.4

any help would be greatly appreciated.

If you're pre-creating exchanges and everything, shouldn't you do a passive bind? And in that case, wouldn't specifying the alternate exchange be pointless?

Hi Magnus,
Thanks for your reply.

From what I can see in the plugin documentation and the related github projects:

  • rabbitmq input:
    there is indeed a passive option, but focused on the queue creation
    the exchange creation can be avoided by not providing exchange type

  • rabbitmq output:
    the documentation refers to a passive option, but there is no code implementation in relation to it, nor any information what it would cover,
    there is no way I see to bypass the exchange creation/validation

I do agree that an option to do an all passive bind would be the most suitable approach for my use case, avoiding redundant configuration.

I tried to "make it work" and have identified 3 modifications that are needed/missing:

logstash-mixin-rabbitmq_connection

[rabbitmq_connection.rb]

  def declare_exchange!(channel, exchange, exchange_type, durable, arguments)
    @logger.debug? && @logger.debug("Declaring an exchange", :name => exchange,
                  :type => exchange_type, :durable => durable)
    exchange = channel.exchange(exchange, :type => exchange_type.to_sym, :durable => durable, :arguments => arguments)
    @logger.debug? && @logger.debug("Exchange declared")
    exchange
  rescue StandardError => e
    @logger.error("Could not declare exchange!",
                  :exchange => exchange, :type => exchange_type,
                  :durable => durable, :arguments => arguments, :error_class => e.class.name,
                  :error_message => e.message, :backtrace => e.backtrace)
    raise e
  end

Method should include an additional parameter arguments

logstash-output-rabbitmq

[rabbitmq.rb]

  # Additional arguments for the exchange, for example could be the presence of an alternate exchange, ...
  config :arguments, :validate => :array, :default => {}

Additional parameter should be declared: arguments

...

  def register
    connect!
    @hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable, @arguments)
    @codec.on_event(&method(:publish))
  end

Method declare_exchange should be called with @arguments as additional parameter

To be thorough, it would be better to also make following alteration:

logstash-input-rabbitmq

[rabbitmq.rb]

  config :exchange_arguments, :validate => :array, :default => {}

Additional parameter should be declared: exchange_arguments
...

  def bind_exchange!
    if @exchange
      if @exchange_type # Only declare the exchange if @exchange_type is set!
        @logger.info? && @logger.info("Declaring exchange '#{@exchange}' with type #{@exchange_type}")
        @hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable, @exchange_arguments)
      end
      @hare_info.queue.bind(@exchange, :routing_key => @key)
    end
  end

Method declare_exchange should be called with @exchange_arguments as additional parameter

I have confirmed above changes are sufficient to allow for my situation to be resolved, but haven't run unit tests nor know too well (for now) the process for submitting such changes.

Still, an option for binding in a completely passive manner would be the best approach from my point of view at least.