Multipipeline with Filebeat, am I doing it right?

Hi, I'm new to the ELK Stack and I've been assigned the simple task to use this technology to store and analyze our logs.
Quick intro: we have a folder containing logs, one for each microservice. A new log is produced everyday with a date in the filename (e.g. ms0_21052018.log).
These logs are similar but not identical, so I opted for n pipelines, one for each microservice.
Each logstash config. file reads from Filebeat, apply some filters and outputs to the console (this will change to Elasticsearch).

I run logstash with "bin/logstash" which reads the pipelines.yml and starts everything I put in there. However, am I correct if I say that each line of each log gets pushed to every pipeline I have?
I know I can check the input, filtering by the "source" field, but I still don't like the idea that all new lines get forwarded to all pipelines.

Question: is there a way to selectively push Beat's new log lines to their corresponding pipelines, rather than checking the input in the pipeline config file?

Thank you.

1 Like

If you have a single filebeat reading logs and writing to a logstash output then only a single logstash pipeline can consume that. If you try to start two pipelines listening on the same port then the second will get an address already in use error.

You can start a different filebeat instance for each type of log, writing to different ports.

You can add conditions around all the processing for the different logs in the same pipeline.

You can add one extra pipeline that just does routing. Consuming all the output from a single filebeat and routing to different tcp outputs based on tags added by the prospectors, then process each type of log in its own pipeline.

It would not surprise me if I have missed some of the options.

1 Like

Thank you, Badger.
To start another Filebeat instance for each log file seems excessive and I'd prefer not to have a single pipeline with a lot of conditions, it would be too confusing.
I like the routing pipeline idea and I'll try it. However, it seems to me that all these solutions kill the advantages of a multipipeline setup (different workers, batch size etc.). Tell me if I'm wrong.

Anyway, just to practice, at the moment I'm using n pipelines reading from the same Beat port, with each pipeline filtering on the source file name.

So, my filebeat.yml has one "input", log type, reading from a directory all the *.log files.
My pipelines.yml has as many pipelines as I need, each one pointing to a different config file.

Problem: I need to filter on the timestamp of the log filename so I can read only today's logs but I'm having some trouble.

I've extracted the timestamp that I need like this:
grok { match => { "@timestamp" => "(?<yyyymmdd>[0-9]{4}\-[0-9]{2}\-[0-9]{2})"} }

and with the mutate filter I've built a string equals to the log filename that I want to read. From the output in rubydebug:"logfilename" => "logFile_ms0.2018-06-27.log",

but this IF doesn't work:
if [source] =~ "(.)*[logfilename]" { /* do something */ }

because it reads from every single log file in the directory (even if the filename is totally different).
Perhaps it's due to the (.)* that I put to catch the file path. How do I use a regexp and a field name in this IF?

Thank you.

logstash will not do %{fieldname} substitution in that context. I think you have to resort to ruby. For that regex you can use String .include? instead of a regex.

    ruby {
        code => '
            if event.get("source").include? event.get("logfilename") 
                event.set("matched", true)

Yep, I've come to the same conclusion so I used the Ruby filter. I still had problem with the IF-THEN though.
This is the config:

input { beats { port => 5044 } }
filter {
	grok { match => { "@timestamp" => "(?<yyyymmdd>[0-9]{4}\-[0-9]{2}\-[0-9]{2})" } }
	ruby {
		code => "
			event.set('matched', false)
			inputfilename = File.basename(event.get('source'))
			expectedfilename = 'logFile_ms0.' + event.get('yyyymmdd') + '.log'
			if inputfilename == expectedfilename
				event.set('matched', true)
	mutate { add_field => { "evaluation" => "%{matched}" } } # debug
	if ("%{matched}") { /* do something */ }
output { stdout {codec => rubydebug} }

Everything is good up to the IF.
The "matched" event is set correctly and the syntax "%{matched}" to read the value also seems correct as the field "evaluation" gets populated with the same value of "matched".

However, I've tried

  • if ("%{matched}")
  • if ("%{matched}" == "true")
  • if ("%{matched}" =~ "true")
  • if ("true" in "%{matched}")
  • if ("%{matched} == true")

and few other combinations. None of them worked as expected as it enters the IF even when "matched" is false.
I'm feeling pretty stupid right now, I can't grasp how Logstash' syntax works :frowning:

    if [matched] {
1 Like

I definitely need a vacation. :man_facepalming: :laughing:
Thanks for your help, Badger!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.