LogStash with distributor pattern, relay pipelines problem

Hi everyone,

i recently changed my LogStash configuration from multiple pipelines input to 3 pipelines, relaying on another local pipelines with distributor.

Now, i've tried to send and udp json to the udp pipeline, with the tag "localadmin" as u see in my configuration. The log arrives on the logstash node, but it seems that something doesnt work, as the data were not indexed on the elastic cluster.

Below my pipelines.yml configuration:

- pipeline.id: udp
  config.string: |
    input { udp { port => 514 } }

    output {
        if [type] == "localadmin" {  
          pipeline { send_to => localadmin } 
        } else if [type] == "passwordsynchronizer" { 
          pipeline { send_to => passwordsynchronizer }
        } else if [type] == "passwordstate" { 
          pipeline { send_to => keys }
        } else if [type] == "xibo" { 
          pipeline { send_to => xibo }
        }
    }
- pipeline.id: beats
  config.string: |
    input { beats { port => 5044 } }

    output {
        if [agent][type] == "filebeat" { 
          pipeline { send_to => exchange }
        } else if [agent][type] == "winlogbeat" {
          pipeline { send_to => sharepoint }
        }
    }
- pipeline.id: http
  config.string: |
    input { http { port => 80 } }

    output {
        if [type] == "gecov" {
          pipeline { send_to => gecov }
        }
    }
- pipeline.id: localadmin
  path.config: "/etc/logstash/conf.d/distributor/localadmin.conf"
- pipeline.id: passwordsynchronizer
  path.config: "/etc/logstash/conf.d/distributor/passwordsynchronizer.conf"
- pipeline.id: passwordstate
  path.config: "/etc/logstash/conf.d/distributor/keys.conf"
- pipeline.id: xibo
  path.config: "/etc/logstash/conf.d/distributor/xibo.conf"
- pipeline.id: exchange
  path.config: "/etc/logstash/conf.d/distributor/exchange.conf"
- pipeline.id: sharepoint-microsoft
  path.config: "/etc/logstash/conf.d/distributor/sharepoint.conf"
- pipeline.id: gecov
  path.config: "/etc/logstash/conf.d/distributor/gecov.conf"

And here the local pipeline to which it points:

input { pipeline { address => localadmin } }
output {
 elasticsearch {
      hosts => ["firstnode:port", "secondnode:port", "thirdnode:port"] 
      ssl => true
      ssl_certificate_verification => false
      user => someone
      password => 'somepassword'
      ilm_enabled => false
      index => "localadmin"
    }
  }

Thanks in advance :slight_smile:

How does the document you are sending looks like?

You do not have json filter in the udp pipeline nor are you using the json codec in the input, so your message is not being parsed and there is no type field to be filtered.

Hi @leandrojmp, thank you for the fast replay.

This is the message:

[
{
    "type": "localadmin",
    "blabla": {
    "Users": {
      "Created": [],
      "Deleted": []
    },
    "Administrators": {
      "Added": [],
      "Removed": []
    },
    "campo": "localadmin",
    "Host": {
      "Type": [
        "Client"
      ],
      "Name": [
        "itk-dk-02"
      ]
    },
    "host": "ipaddress"
  }
}
]

Can you tell me where to put the codec filter in the input? directly below the

input { pipeline { address => localadmin } }

?

No, you need to parse the message in the first pipeline that is receiving it, which is the one with the udp input.

You are using a field from the message to do the filtering, so if you do not parse it, you do not have the field to filter in your message and your output will not work correctly.

Try this:

input { 
    udp { 
        port => 514 
        codec => "json"
    } 
}

Ok just edited, using :

- pipeline.id: udp
  config.string: |
    input { udp { port => 514 codec => "json" } }

works fine for json input. I see the data on elastic in the right index.

But unfortunately on that port i have other inputs that aren't always formatted in json.

I need something like:

input { pipeline { address => localadmin codec => "json"} }

Maybe using the json codec directly in the filter section of th pipeline?

Thankyou.

Just understood my mistake, if logstash doesnt know the tipe of the data that comes, it can't sort by tags and send to other pipelines.
The goal was to have fewer listening ports as possible on that node.

I don't think that the pipeline input has the codec option, it seems to have only the send_to and address options.

You will need to filter for some string in the message since you can't parse the message.

Something like this:

input {
    upd {
        port => 514
    }
}
output {
    if "localadmin" in [message] {
        pipeline { send_to => localadmin }
    } else if "passwordsyncrhonizer" in [message] { 
        pipeline { send_to => passwordsynchronizer }
    } other else if conditions
}

I would also suggest that you use pipelines.yml to only point to the configs, and have the configs in separated files.

For example, create a udp.conf file with the udp pipeline and just point to this file in the pipelines.yml file, having the configurations in the pipelines.yml can lead to confusion and mistake if your configuration grows.

- pipeline.id: udp
  path.config: "/etc/logstash/conf.d/udp.conf"
1 Like

Perfect, it works. Thanks :black_heart:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.