Logstash: configure multiline configuration for specific type of logs only

Hey Folks,

We've an existing Logstash configuration where we use fields "logs_type" to separate the different types of logs and parse them with their respective grok patterns into multiple respective indexes.

Now we are trying to add a configuration of new multiline log into this.

so far input configuration had been simple just accepting steam from Filebeat port,

New change which needs to be achieved is
if "logs_type" of input steam is let's say "foo" then run multiline plugin and normalise multiple lines into single lines as per pattern and forward to grok for splitting it into proper document with respective fields.

given below is the configuration we've tried with input.

input {
        beats {
        port => 5044
        if [fields][log_type] == "foo" {

                codec => multiline      {
                        pattern => "\#\s+Time:\s+%{BASE10NUM:ts}\s+%{TIME:ts2}"
                        negate => true
                        what => "previous"
                }
        }

        }
}
~

with above configuration we are getting following error when we try to start Logstash.

[2024-08-09T11:43:13,376][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"=>\" at line 4, column 5 (byte 36) after input {\n\tbeats {\n\tport => 5044\n\tif ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:239:in `initialize'", "org/logstash/execution/AbstractPipelineExt.java:173:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:48:in `initialize'", "org/jruby/RubyClass.java:949:in `new'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:49:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:386:in `block in converge_state'"]}

Can someone please guide us in fixing configuration or approach or both?

stack:
Elasticsearch ver: 8.15.0
Logstash ver: 8.15.0
Filebear ver: 8.15.0

Regards

There are a couple of things that are wrong in your configuration.

First, you cannot have a conditional inside an input as you did, and you also cannot have conditionals in the input block.

You can only use conditionals in the filter and output block.

Another thing is that you are using the beats input, the multiline configuration needs to be done on beats side, not logstash.

This is mentioned here in the multiline codec input.

So, you will need to move your multline configuration to Filebeat following this documentation.

Ok done as suggested above by.
here is my filebeats.yml

filebeat.inputs:
- type: filestream
  paths:
    - /var/log/mysql_slow.log
  multiline.type: pattern
  multiline.pattern: \#\s+Time:\s+%{BASE10NUM:ts}\s+%{TIME:ts2}
  multiline.negate: true
  multiline.match: after
  fields:
    log_type: mysql-slowlog
output.logstash:
  hosts: ["localhost:5044"]
  loadbalance: true
  index: mysql-slow

and here my input.conf for logstash.

input {
  beats {
    port => 5044
  }
}

However when i go ahead with this configuration elasticsearch receives 42 documents insted of 2

One if my doubt is does filebeat configurations supports GROK patterns for pattern matching? will be trying to change it with regular regex in config while I post this for you.

tried changing the multiline pattern in filebeat

From

multiline.pattern: \#\s+Time:\s+%{BASE10NUM:ts}\s+%{TIME:ts2}

to

multiline.pattern: "\\#\\s+Time:\\s+[0-9]{6}\\s+[0-9].?\\:[0-9].?\\:[0-9].?"

Still getting 42 documents on ES instead of just 2

Wondering what is going wrong with my configuration.

This configuration is wrong, please check the documentation for the right format.

You are using the filestream input, your configuration needs to be in this format:

parsers:
- multiline:
    type: pattern
    pattern: 'YOUR-PATTERN'
    negate: true
    match: after

No, it does not, you need to have a regex pattern that will match the start of your multiline line, the pattern you are using will not work.

Here is my new config file for filebeat.

filebeat.inputs:
- type: filestream
  paths:
    - /var/log/mysql_slow.log
parsers:
- multiline:
   type: pattern
   pattern: '#.*Time:.*[0-9][0-9][0-9][0-9][0-9][0-9].*'
   negate: true
   match: after
  fields:
    log_type: mysql-slowlog

Tried multiple regex and escaping sequence for pattern matching, also tried to turn negate flag false, however
nothing changed still receiving 42 documents instead of 2 on ES.

Any thing which needs to be added on Logstash config to identify such filesteam?

Finally found a fix.
after correcting indentation for filebeat file.

filebeat.inputs:
- type: filestream
  paths:
    - /var/log/mysql_slow.log
  fields:
    log_type: mysql-slowlog

  parsers:
  - multiline:
     type: pattern
     pattern: '#.*Time:.*[0-9][0-9][0-9][0-9][0-9][0-9].*'
     negate: true
     match: after

I was expecting filebeat to crash if yaml indentation is incorrect, strangely however when I restarted the service number of times, it went through without throwing any errors.

Thanks for you help @leandrojmp you helped me understanding this

Now I will take my time playing around different regex and other optimisation.