Storing RabbitMQ Logs in ELK

Hi,
I have been using ELK for about six months now, and it's been great so far. I'm on logstash version 6.2.3.
RabbitMQ makes up the heart of my distributed system (RabbitMQ is itself distributed), and as such it is very important that I track the logs of RabbitMQ.
Most other conversations on this forum seem to use RabbitMQ as an input/output stage, but I just want to monitor the logs.
The only problem I'm finding is that RabbitMQ has multiline logging, like so:

=WARNING REPORT==== 19-Nov-2017::06:53:14 ===
closing AMQP connection <0.27161.0> (...:32799 -> ...:5672, vhost: '/', user: 'fnx_worker'):
client unexpectedly closed TCP connection

=WARNING REPORT==== 19-Nov-2017::06:53:18 ===
closing AMQP connection <0.22410.0> (...:36656 -> ...:5672, vhost: '/', user: 'fnx_worker'):
client unexpectedly closed TCP connection

=WARNING REPORT==== 19-Nov-2017::06:53:19 ===
closing AMQP connection <0.26045.0> (...:55427 -> ...:5672, vhost: '/', user: 'fnx_worker'):
client unexpectedly closed TCP connection

=WARNING REPORT==== 19-Nov-2017::06:53:20 ===
closing AMQP connection <0.5484.0> (...:47740 -> ...:5672, vhost: '/', user: 'fnx_worker'):
client unexpectedly closed TCP connection

I have found a brilliant code example here, which I have stripped just to the filter stage, such that it looks like this:

filter {
    if [type] == "rabbitmq" {
        codec => multiline {
		    pattern => "^="
		    negate => true
		    what => "previous"
		}
		grok {
			type => "rabbit"
			patterns_dir => "patterns"
			pattern => "^=%{WORD:report_type} REPORT=+ %{RABBIT_TIME:time_text} ===.*$"
		}
		date {
			type => "rabbit"
			time_text => "dd-MMM-yyyy::HH:mm:ss"
		}
		mutate {
			type => "rabbit"
			add_field => [ 
				"message", 
				"%{@message}" 
			]
		}
		mutate {
			gsub => [
				"message", "^=[A-Za-z0-9: =-]+=\n", "",
				# interpret message header text as "severity"
				"report_type", "INFO", "1",
				"report_type", "WARNING", "3",
				"report_type", "ERROR", "4",
				"report_type", "CRASH", "5",
				"report_type", "SUPERVISOR", "5"
			]
		}
	}
}

But when I save this to a conf file and restart logstash I get the following error:

[2018-04-04T07:01:57,308][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
[2018-04-04T07:01:57,316][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
[2018-04-04T07:01:57,841][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-04-04T07:01:57,973][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-04-04T07:01:58,037][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, { at line 3, column 15 (byte 54) after filter {\n    if [type] == \"rabbitmq\" {\n        codec ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in `compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:in `compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:in `block in compile_sources'", "org/jruby/RubyArray.java:2486:in `map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in `compile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:51:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:169:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:315:in `block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:312:in `block in converge_state'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:299:in `converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:in `block in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:in `converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:348:in `block in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}

Any ideas what the issue could be?
Thanks,

Don't parse the text logs. Just set up Logtash to consume from a queue bound to the amq.rabbitmq.log exchange.

filter {
if [type] == "rabbitmq" {
codec => multiline {
pattern => "^="
negate => true
what => "previous"
}

Codecs are specified for inputs. You can't list a codec in the filter block.

1 Like

Thanks, I'm not familiar with the idea of "a queue bound to the amqp.rabbitmq.log". I usually use filebeat to send logs to logstash, can I use filebeat for this purpose?

No, Filebeat can't consume from RabbitMQ queues. Look into Logstash's rabbitmq input.

1 Like

Since RabbitMQ now uses lager
you can use the standard lager syslog backend.

Please report back if and how you got it working!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.