Logstash Pipeline Configuration & inputs

Hi all,

Wanted to check an understanding on multiple pipelines:

My understanding of having multiple pipelines is it allows you to have different inputs and outputs for a specific filter and provides better performance (summarized).

I came across this when I had different input / filters and outputs.

Using an example:

I have a filebeat that sends multiple different logs from different sources to logstash. On the filebeat thread I had a thread where it was not recommended to use different ports with filebeat. So that limits me to port 5044.

On my main pipeline input I use host=> 0.0.0.0 and port => 5044 - all is good!
On my someother pipeline can I still use host => 0.0.0.0 and port => 5044??

When I was doing some testing I kept on getting a pipeline error that 0.0.0.0 and port 5044 is already in use and that pipeline use to cause a pipeline plugin to fail and try restart.

The other options I have is the server has multiple NIC's I could use a different IP address and port 5044 and just add the additional host to the host section of the filebeat?

Would really appreciate the guidence

There is no need to have multiple inputs if you want them on the same port. Just do conditional tagging in order to differentiate from them.

i.e

if [beat][name] == "filebeat" {
   do some filtering
} else if [beat][name] == "metricbeat" {
   do some other filtering
}

Hope that makes sense.

Multiple inputs can not share the same host/port. You can however have a single input in a pipeline and then use conditionals to distribute the data to multiple distinct processing pipelines.

Thank you for the responses: @Christian_Dahlqvist so if I look at the documentation it says:
- pipeline.id: upstream
config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } }
- pipeline.id: downstream
config.string: input { pipeline { address => myVirtualAddress } }

The way I understand that if I may ask:
this uses two separate pipelines:

  1. first pipeline listens on 0.0.0.0 port 5044 for incoming events so that is filebeat sending all the different logs
  2. second pipeline "downstream" - that's what I don't understand.

So does the downstream receive all the filtered logs from filebeat on a single address i.e. 0.0.0.0 5044 it then passes these events to the downstream pipeline, the downstream pipeline is where you do your filtering for the output to elasticsearch?

Is that thinking correct or am I still smoking my socks

Thank you for the response.

To me the real issue with this is that the input file then becomes incredibly bloated, if the filters are long and complicated (which they are)

I think my issue comes in with wanting separate filters for separate pipelines but in a filter.conf

You can have a single upstream pipeline that receives all data and then use conditionals to send the data to one of multiple downstream pipelines.

Oh wow that is exactly what I was looking for.

Question if I have multiple filter.conf (basically one per pipeline) would one use something like tag=="abc" then point it to a filter.conf?

How would that work. I am just trying to avoid one file with 100000 lines of code.

Just trying to make sense of this before I mangle my pipeline if you don't mind.

So the "myVirtualAddress" is the defined name of the downstream pipeline created ?

- pipeline.id: beats-server
  config.string: |

I then see it uses config.string (as per previous post with the virtual address), can one still use the path.config pointing to a filter.conf file?

So my thinking if I am right:

- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }  **--- So it has one input on the 5044 port**
    output {  **----- then outputs to defined downstream pipelines**
        if [type] == log1 {
          pipeline { send_to => log1 }
        } else if [type] == log2 {
          pipeline { send_to => log2}
        } else {
          pipeline { send_to => unknownEvent}
        }
    }
- pipeline.id: log1-processing
  config.string: |
    input { pipeline { address => log1} }
    **===>>>** Here is the difference **--** instead of having the filter here use: path.config: 
    <Path_To_Filter.conf> **--- then here use a .conf file instead of having the entire text in the **
      **          pipeline.yml**
      output {
        elasticsearch { hosts => [es_cluster_a_host] }
      }

So this is how I wrote it - it doesn't work but this is how I have constructed it so would really appreciate the help:

- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == fwlog {
          pipeline { send_to => fwlogs }
        } else if [type] == idpslogs {
          pipeline { send_to => idps }
        }
    }

- pipeline.id: firewall-processing
  config.string: |
    input { pipeline { address => fwlogs } }
    path.config: "/etc/logstash/conf.d/e-filter.conf"
    output {
      elasticsearch {
         hosts => ["localhost"]
         id => "output_elasticsearch_fwlogs"
         index => "logstash-%{+YYYY.MM.dd}"
       }
    }

- pipeline.id: idps-processing
  config.string: |
    input { pipeline { address => idpslogs } }
    path.config: "/etc/logstash/conf.d/20_filter.logstash.conf"
    output {
      elasticsearch {
         id => "output_elasticsearch_idps"
         hosts => ["localhost"]
         index => "idps-1.0.0-%{+YYYY.MM.dd}"
         template => "/etc/logstash/idps/templates/idps.template.json"
         template_name => "idpstempl-1.0.0"
         template_overwrite => "true"
  }
}
#use the normal pipeline config as this uses UDP on different port.
- pipeline.id: udpdata
   path.config: "/etc/logstash/udplog/conf.d/*.conf"

I am using the path.config here as well to point to my filter file. I did comment it out but it didn't make a difference.

In the first pipeline, output to stdout to verify that the fields exist and that the conditionals work. You can always also add an else output to capture anything that does not match. I also do not think you can mix config strings and paths, so would recommend placing all the configs in separate files instead of using the config string option.

Thank you @Christian_Dahlqvist
Am I understanding you correctly I must drop the idea above and go back to normal path.config?

If so I am back to not being able to use multiple inputs per different log file thats processed by filebeat.

Just checking

Do what you did above, but put each pipeline logic in a file and reference this. Add a separate output to the pipeline with the input for troubleshooting so you know whether the data looks like you expect it to or not.

You can not have a path.config within the config.string parameter.

@Christian_Dahlqvist
Firstly thank you for taking me through this I really do appreciate this!

Ok so checking logic:

  1. I seperate each of the pipeline logic leaving the first block in the normal pipeline.yml
    i.e.
    Pipeline.yml would look like:

     pipeline.id: beats-server
    config.string: |
      input { beats { port => 5044 } }
      output {
          if [type] == fwlog {
            pipeline { send_to => fwlogs }
          } else if [type] == idpslogs {
            pipeline { send_to => idps }
          }
      }
    
  2. Then I create a separate pipeline file for each of the other blocks, i.e.

fwpipeline.yml

pipeline.id: firewall-processing
  config.string: |
    input { pipeline { address => fwlogs } }
    **path.config: "/etc/logstash/conf.d/e-filter.conf"** <<---- remove this
    output {
      elasticsearch {
         hosts => ["localhost"]
         id => "output_elasticsearch_fwlogs"
         index => "logstash-%{+YYYY.MM.dd}"
       }
    }

idpspipeline.yml

pipeline.id: idps-processing
  config.string: |
    input { pipeline { address => idpslogs } }
    **path.config: "/etc/logstash/conf.d/20_filter.logstash.conf"** <<-- remove this
    output { 
      elasticsearch {
         id => "output_elasticsearch_idps"
         hosts => ["localhost"]
         index => "idps-1.0.0-%{+YYYY.MM.dd}"
         template => "/etc/logstash/idps/templates/idps.template.json"
         template_name => "idpstempl-1.0.0"
         template_overwrite => "true"
  }
}

Question: In my last pipeline which is simple: udpPipeline.yml

        pipeline.id: udpdata
        path.config: "/etc/logstash/udplog/conf.d/*.conf"

Can I still use that as configured in a different pipeline file or will it pick up that its using "path.config"

Two other questions of understanding:

  1. If one cannot reference a config file for filter, does that mean one is forced to put all the filter logic in the pipeline config file - is my understanding correct? Just don't dont want to go and move all that logic etc and there a better way to do it.

  2. I am sorry for the stupid question but when you say

So the way I understand this is usually I would reference a config file for a pipeline using path.config now keeping my first block

pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == fwlog {
          pipeline { send_to => fwlogs }
        } else if [type] == idpslogs {
          pipeline { send_to => idps }
        }
    }

How would I add the reference if I can't use path.config - read the documentation file on this but I can't see when it is explained.

Great idea for stdout - I am going to add that to the file!

Again thank you very much for the help I never knew about this config and it makes perfect sense its now just the last few things.

@Christian_Dahlqvist

Interesting when I remove path.config completely out and just use the file as above for pipelines.yml logstash just dies.

In logs: loads config, starts loading pipelines.yml then get a Java exception system exited error and logstash stops.
Ver 6.5.1

[2018-11-28T19:10:35,494][DEBUG][logstash.runner          ] --------------- Logstash Settings -------------------
[2018-11-28T19:10:35,552][DEBUG][logstash.config.source.multilocal] Reading pipeline configurations from YAML {:location=>"/etc/logstash/pipelines.yml"}
[2018-11-28T19:10:35,609][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

Put the old pipelines file back and all is good... (starts as per normal)

Looked at config above all the blocks are closed correctly etc so not sure what logstash doesn't like.

This is what my pipeline.yml

- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == suricata {
          pipeline { send_to => suricata }
        } else if [type] == snort {
          pipeline { send_to => snort }
        }
   }
- pipeline.id: suricata-processing
  config.string: |
    input { pipeline { address => suricata } }
    output {
      elasticsearch {
         hosts => ["localhost"]
         id => "output_elasticsearch_suricata"
         index => "logstash-%{+YYYY.MM.dd}"
       }
    }
- pipeline.id: snort-processing
  config.string: |
    input { pipeline { address => snort } }
    output {
      elasticsearch {
       id => "output_elasticsearch_snort"
       hosts => ["localhost"]
       index => "snort-%{+YYYY.MM.dd}"
       template => "/etc/logstash/snort/templates/snort.template.json"
       template_name => "snort-1.0.0"
       template_overwrite => "true"
  }
}

@Christian_Dahlqvist - Ok so looking deeper into it:

A journalctl -u logstash.service -f shows:

Nov 29 17:10:53 machine logstash[29813]: [2018-11-29T17:10:53,209][DEBUG][logstash.config.source.multilocal] Reading pipeline configurations from YAML {:location=&gt;"/etc/logstash/pipelines.yml"} Nov 29 17:10:53 machine logstash[29813]: ERROR: Failed to read pipelines yaml file. Location: /etc/logstash/pipelines.yml

My previous post has the pipeline and its mostly a copy and paste from the website but changed a few names to be more aligned to my setup.

I am on version 6.5.1 -- is there a required version for this pipeline feature as I do see it is in beta?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.