Multiple pipelines - distributor pattern

Looks like I'm having a similar problem to another thread that was closed without being resolved.

I am trying to use the distributor pattern. The linked doc makes some sense, but I have a few remaining questions.

  1. Regarding my pipeline.conf, what should be the input? I see a reference to "pipeline" input, but there is no "pipeline" input plugin.
  2. In my pipelines.yml, Is it possible for me to keep my path.config: structure? Or do I have to migrate my configurations to "config.string:" as the doc suggests? I've grown fond of the modularity provided by the separate pipeline.conf files and intend to work them into our SCM soon, giving control of certain pipelines to different users.
  3. In the example pipelines.yml: pipeline { send_to => weblogs }, is "weblogs" the name of the pipeline.id?

I might have some edit suggestions for the doc to give the pipeline-to-pipeline doc some more logic, once I figure this out for myself. Starting the doc with some example files that use the same naming scheme as in the patterns below could make a lot of sense, and fit other elastic docs that I have seen.

  1. There is no pipeline plugin. Unlike other inputs it is implemented in the logstash core.

  2. Yes, you can keep using files, no need to use config.string. config.string is simpler to use in the documentation.

  3. No, weblogs is not the pipeline id, it is the virtual address of the pipeline input. The pipeline can be called anything.

Glad to hear that. Still, I cannot find examples of the usage in the pipeline.conf or as a . Can I assume the syntax is like:

input {
  pipeline {
    address  => virtual.address
  }
}

Very glad to hear that, and it should answer my question completely.

How is the virtual address defined? Is this not done in the send_to statement?

I'm looking at the example code:

# config/pipelines.yml
- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == apache {
          pipeline { send_to => weblogs }
        } else if [type] == system {
          pipeline { send_to => syslog }
        } else {
          pipeline { send_to => fallback }
        }
    }
- pipeline.id: weblog-processing
  config.string: |
    input { pipeline { address => weblogs } }
    filter {
       # Weblog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_a_host] }
    }
...

Here it seems that the output and input that I'm expected to use is listed.

Taking all of this into account, I have built a simple pipeline to test:

#master-pipeline.conf
input {
  beats {
    port => "5053"
  }
}
output {
  if [type] == log {
    pipeline {
      send_to => legacy-gateway 
    }
  }
}


#legacy-gateway-pipeline.conf
input {
  pipeline {
    address => legacy-gateway
  }
}
output {
  elasticsearch {
    hosts => ["ES CLUSTER ADDRESS"]
    index => "legacy-gateway"
  }
}

However, my events no longer make it to ES after converting this from a single pipeline. Is there something I'm missing here?

The virtual address on the input is set using the address option, and on the output it is set using the send_to option.

I would expect that config to work provided that the events have [type] set to "log".

Hmm... Yes the filebeat that I am trying to ship is - type: log.

I'll try setting the config in pipelines.yml to see if that makes a difference while I wait for additional advice.

edit> As I'm thinking this through, I checked netstat -aon | grep 5053 and I do not get a result that I am expecting. Looks like for some reason, logstash is not creating a listener on that port with the new distributor pattern configuration.

Alright, I've boiled it down to the configuration not creating a listener. I've tested this in the dedicated pipeline.conf files, as well as editing the pipelines.yml file directly. The current configuration is as follows:

#pipelines.yml
- pipeline.id: master-pipeline
  config.string: |
    input { beats { port => "5053" } }
    output {
        if [type] == log {
          pipeline { send_to => legacy-gateway }
        } else if [type] == something {
          pipeline { send_to => somethingelse }
        }
    }
- pipeline.id: legacy-gateway
  config.string: |
    input { pipeline { address => legacy-gateway } }
    output {
      elasticsearch { hosts => ["ES CLUSTER ADDRESS"]
          index => "legacy-gateway" }
    }

I've made the code match as closely as I can to the example used in the doc, to make it easy for us to compare. Wasn't sure how yaml feels about if statements with only 1 line so I added a bs else.

# netstat -aon | grep 5053
# netstat -aon | grep 5044
tcp        0      0 0.0.0.0:5044            0.0.0.0:*               LISTEN      off (0.00/0/0)

I would expect that to be 'port => 5053' but logstash is good about doing basic conversion from strings to integer, hash to array, and so on.

Does that 5044 listener go away when you stop the logstash instance? Is it really just ignoring the port option?

Yes, the 5044 logstash listener is another pipeline that is further up in pipelines.yml. I'm not showing it here but it's a very simple multiple pipeline config. Previously, I had a path.config file configured for port 5053 and that was working properly. Only now that I am trying to set this pipeline to pipeline config does it fail to set up a listener.

example of redacted portion of pipelines.yml
- pipeline.id: my-pipeline_1
  path.config: "/etc/path/to/p1.config"
- pipeline.id: my-other-pipeline
  path.config: "/etc/different/path/p2.cfg"

I'll test the syntax you suggested to see if that makes a difference.

I am starting to wonder if this is a YAML multiline issue. Can you try

config.string: "input { beats { port => 5053 } } output { if [type] == log { pipeline { send_to => legacy-gateway } } else if [type] == something { pipeline { send_to => somethingelse } } }"

Had to make both pipelines a single line, and vim complained about using quotes twice in the statement. It turned out like this:

- pipeline.id: master-pipeline
  config.string: 'input { beats { port => "5053" } } output { if [type] == log { pipeline { send_to => legacy-gateway } } else if [type] == something { pipeline { send_to => somethingelse } } }'
- pipeline.id: legacy-gateway
  config.string: 'input { pipeline { address => legacy-gateway } } output { elasticsearch { hosts => ["ES CLUSTER ADDRESS"] index => "legacy-gateway" } }'

I noticed a new error on the journalctl so I took a closer look.
:exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, { at line 1, column 46 (byte 46) after input { pipeline { address => legacy-gateway ",
and
:exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, ( at line 1, column 62 (byte 62) after output { if [type] == log "

Seems like it doesn't like the if statement. Any idea why? Looking at the example it should be identical.
I even reverted it back to the multiline statement and see the same error, different position of course. Still, my apologies for not seeing this sooner.

edit > I went back to the pipeline.conf to test --config.test_and_exit and got the following:

[root@rnh03velk01.prd: /etc/logstash/conf.d]# /usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/master-pipeline.conf
~~~~~deleted the typical stuff here~~~~~
    [FATAL] 2019-08-06 20:43:27.960 [LogStash::Runner] runner - The given configuration is invalid. Reason: Expected one of #, ( at line 10, column 20 (byte 87) after output {
  if [type] == log
[ERROR] 2019-08-06 20:43:27.968 [LogStash::Runner] Logstash - java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
[root@rnh03velk01.prd: /etc/logstash/conf.d]# cat master-pipeline.conf
input {
  beats {
    port => "5053"
  }
}
#filter {
#
#}
output {
  if [type] == log {
    pipeline { send_to => legacy-gateway }
  }
}

Counting it out, the character it complains about is the { after the statement if [type] == log. I really don't understand which argument it would expect there instead. I could try matching a different pattern but that wouldn't allow me to split the pipelines. I don't think it's a yaml multiline thing, or probably it would fail earlier in either file. Seems like it is expecting something very specific and whatever it is does not match the doc about this feature.

"log" should be in quotes. So should "legacy-gateway".

I've added quotes to each of those variables and tested both files. Logstash reported Configuration OK for each. I patched up pipelines.yml for path.config and started logstash successfully, now with a listener on my listed port!

I tested turning on the filebeat that is reporting to 5053 but I am not seeing the data making it to ES. Trying to do my due diligence though, I ran a tcpdump and see plenty of packets flying in when I start the filebeat, so it looks like the network is fine, master-pipeline listener is set up, and logstash thinks my config is okay. Is there a way we can test to see where it might be breaking down in the remaining steps? Can we see if the virtual address is active, perhaps?

For testing I would remove the conditional and just send everything to "legacy-gateway". Once that is working you can worry about filtering with conditionals.

Right, so to be clear I had this working before. Single file with beat input on 5053, output to my ES cluster on index "legacy-gateway". Now I have a need to set up this conditional so that I can pull in many log files which need dramatically different filters.

As a sanity check, I performed this test again. Commented out the existing output, uncommented the old output (copied from legacy-gateway-pipeline to master-pipeline), commented out legacy-gateway entries in pipelines.yml. Test is a success, the documents get formed and my deleted index is re-formed.

I wonder if -type: log is the appropriate syntax for the filebeat? I ran a test adding quotes to be sure, as I don't like to be fooled twice. - type: "log", reverted my configuration to enable the conditional statement and legacy-gateway pipeline, deleted my index and fb registry and started everything up once again, but once again I have no data visible in ES.

If you add a stdout output what do the events look like in ruby debug? Do they have a type field?

I haven't used this output plugin before, but it was simple enough to integrate. Is this what we're expecting?

Aug 07 13:19:41 rnh03velk01 logstash[8699]: "type" => "log"
Aug 07 13:19:41 rnh03velk01 logstash[8699]: "type" => "filebeat",
Aug 07 13:19:41 rnh03velk01 logstash[8699]: "type" => "log"
Aug 07 13:19:41 rnh03velk01 logstash[8699]: "type" => "filebeat",

Normally I would use

output { stdout { codec => rubydebug { } } }

which will cause it to dump the whole event neatly formatted.

It's printing the same way when I add the codec, as the plugin doc describes rubydebug is the default codec. It also says that it prints to the shell running the command, but I'm starting logstash with root.

I commented out the paths and such in pipelines.yml and restarted logstash so that the ports would not be consumed when attempting to execute a specific pipeline.

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/master-pipeline.conf

contains some results like:

         "input" => {
        "type" => "log"

A direct comparison to the conditional:
  stdout { codec => rubydebug { } }
    if [type] == "log" {
      pipeline { send_to => "legacy-gateway" }
    }

Is the == vs => a problem? Without knowing what I'm talking about, this looks healthy to me. If you agree, I was wondering if I should use "tags" on the logs instead of using the "type" conditional?

edit > Starting to feel like I need to find a class about yaml.

I would agree that might be a good idea. There is a lot of baggage associated with type.

Alright. I have:

  if [tags] == "legacy-gateway" {
    pipeline { send_to => "legacy-gateway" }
  }

I also enabled master-pipeline on pipelines.yml and added stdout to legacy-gateway. I'm not seeing my VIP's health checks or the log files making it to the stdout on the legacy-gateway-pipeline, although I don't know if this is even working properly, since the pipeline virtual address may be getting created at different times?

I tried commenting out the conditional as well, but not sure if this configuration is supported. I really would love a doc with some more information on how this "pipeline" core component works, as it seems to be the last thing hanging me up:

output {
  stdout { codec => rubydebug { } }
#  if [tags] == "legacy-gateway" {
    pipeline { send_to => "legacy-gateway" }
#  }
}

Edit > oh my, I think I've solved it. can't have a - in the pipeline virtual address name.
Let me test the conditional and see if I can move on with this thing.