Rabbitmq output plugin

Hello, I want to send data from elasticsearch via logstash to rabbitMQ. All components are in separately docker containers.
I am new in ELK, so can you tell me what I doing wrong?

Logstash.conf

  input {  
    stdin { codec => rubydebug }
    elasticsearch {
      index => "camunda_task_event-%{+YYYY.MM.dd}"
      hosts=> "${ELASTIC_HOSTS}"
      user=> "${ELASTIC_USER}"
      password=> "${ELASTIC_PASSWORD}"
      cacert=> "certs/ca/ca.crt"
    }
  }

output {
    rabbitmq {
      # connect to rabbit
      host => "host.docker.internal"
      port => 5672
      vhost => "/"
      user => "admin"
      password => "admin"
      # Consume from existing queue
      queue => "bpm_json_quorum"
      durable => true
      key => "json.taskEvent"
      arguments => {
                           "x-queue-type" => "quorum"
                  }
    }
  }

Log:
2024-02-06 14:22:30 [2024-02-06T13:22:30,875][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:data_to_rabbitmq, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (ConfigurationError) Something is wrong with your configuration.", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:120)", "org.logstash.execution.AbstractPipelineExt.initialize(AbstractPipelineExt.java:186)", "org.logstash.execution.AbstractPipelineExt$INVOKER$i$initialize.call(AbstractPipelineExt$INVOKER$i$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:846)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1229)", "org.jruby.ir.instructions.InstanceSuperInstr.interpret(InstanceSuperInstr.java:131)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:361)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:128)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:115)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:85)", "org.jruby.RubyClass.newInstance(RubyClass.java:911)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:85)", "org.jruby.ir.instructions.CallBase.interpret(CallBase.java:549)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:361)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)", "org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:92)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:238)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:225)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:226)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:204)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:325)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:116)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:143)", "org.jruby.RubyProc.call(RubyProc.java:309)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:107)", "java.base/java.lang.Thread.run(Thread.java:833)"]}

Did you mean to put this in the input? Normally this would go in the output with stdout instead of stdin.

I'd recommend slowly reducing your configuration section by section until you do get it to run and then adding it back until you see the error and letting us know. At what point you don't see the error and at what point you do, see the error.

Also can you please share the full logstash log? My guess is there's a log line before the one shared that says what it doesn't like in the configuration.

Also on the input I think it's ssl_certificate_authorities not cacert

And on the output I think it's exchange not queue

Thanks for reply. It already working.
Function config file:

  input {  
    elasticsearch {
      index => "camunda_task_event-2024.02.06"
      hosts=> "${ELASTIC_HOSTS}"
      user=> "${ELASTIC_USER}"
      password=> "${ELASTIC_PASSWORD}"
      ssl_certificate_authorities=> "certs/ca/ca.crt"
    }
  }

output {
    rabbitmq {
      # connect to rabbit
      host => "host.docker.internal"
      port => 5672
      vhost => "/"
      user => "admin"
      password => "admin"
      # Consume from existing queue
      exchange => "bpm.topic"
      exchange_type => "topic"
      durable => true
      key => "json.taskEvent"
      arguments => {
                           "x-queue-type" => "quorum"
                  }
    }
  }
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.