Logstash Pipeline to Pipeline issue and looking for more info

Hello,

I have been looking for more in depth information on the Pipeline to Pipeline functionality in Logstash, specifically version 6.4.3. I know it is still in beta, but I have been trying to get it to work in a dev environment.

I have configured a distributor pattern in my pipelines.yml. I can see events come into the first pipeline from Beats, but no events seem to be emitted through the virtual addresses to the other pipelines. I have verified that the pipeline addresses are correct, and I know my Logstash server is working as other pipelines for Metricbeat are receiving and emitting events.

I am also using Centralized Pipeline Management, and that seems to be working as well.

Here are my configs:
Logstash 6.4.3
RHEL 7

pipelines.yml
    - pipeline.id: main
      path.config: "/etc/logstash/conf.d/*.conf"

- pipeline.id: dev-filebeat-sort
  config.string: |
    input {
      beats {
        client_inactivity_timeout => 1200
        port => 5044
      }
  }
    output {
        if [tags] == "registry_json" {
          pipeline { send to => "dev-virt" }
      } else if [tags] == "dev-lexmark-printer" {
          pipeline { send to => "dev-lexmark_printer-virt" }
      } else if [tags] == "dev-btr" {
          pipeline { send to => "dev-btr-virt" }
      } else if [tags] == "dev-toast" {
          pipeline { send to => "dev-toast-virt" }
      }
    }

- pipeline.id: dev-registry
  config.string: |
    input { pipeline { address => "dev-virt" } }
    filter {
      _proprietary stuff here..._ 
}

     output {
     elasticsearch {
     hosts => ["hosthere:9200"]
     user => "user"
     password => "secret"
     manage_template => false
     index => "dev-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
     document_type => "%{[@metadata][type]}"
     }
  }
- pipeline.id: dev-lexmark_printer-test
  config.string: |
    input { pipeline { address => "dev-lexmark_printer-virt" } }
    filter {
      if [tags] == "dev-lexmark-printer" {
        json {
            id => "0001"
            source => "message"
            tag_on_failure => ["_dev_lexmark_printer_jsonparsefailure"]
          }
      }
  }
     output {
     elasticsearch {
     hosts => ["hosthere:9200"]
     user => "user"
     password => "secret"
     manage_template => false
     index => "dev-lexmark-printer-%{+YYYY.MM.dd}"
     document_type => "%{[@metadata][type]}"
     }
  }

- pipeline.id: dev-btr-test
  config.string: |
    input { pipeline { address=> "dev-btr-virt" } }
    filter {
      if [tags] == "dev-btr" {
         json {
             id => "0001"
             source => "message"
             tag_on_failure => ["_btr_jsonparsefailure"]
        }
     }
  }
    output {
    elasticsearch {
     hosts => ["hosthere:9200"]
     user => "user"
     password => "secret"
    manage_template => false
    index => "dev-btr-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
    }
  }
- pipeline.id: dev-toast-test
  config.string: |
    input { pipeline { address => "dev-toast-virt" } }
    filter {
      if [tags] == "dev-toast" {
        json {
          id => "0001"
          source => "message"
          tag_on_failure => ["_dev_toast_jsonparsefailure"]
        }
     }
  }
     output {
     elasticsearch {
     hosts => ["hosthere:9200"]
     user => "user"
     password => "secret"
     manage_template => false
     index => "dev-toast-%{+YYYY.MM.dd}"
     document_type => "%{[@metadata][type]}"
    }
  }

Logstash.yml

path.data: /var/lib/logstash
config.reload.automatic: true
config.reload.interval: 10s
http.host: "iphere"
http.port: 9600-9700
log.level: info
path.logs: /var/log/logstash
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: user
xpack.monitoring.elasticsearch.password: "secret"
xpack.monitoring.elasticsearch.url: "http://iphere:9200"
xpack.monitoring.collection.interval: 10s
xpack.monitoring.collection.pipeline.details.enabled: true
xpack.management.enabled: true
xpack.management.pipeline.id: ["main", "dev-filebeat-sort", "dev-registry", "dev-lexmark_printer-test", "dev-btr-test", "dev-toast-test", "dev-me$
xpack.management.elasticsearch.username: user
xpack.management.elasticsearch.password: "secret"
xpack.management.elasticsearch.url: "http://iphere:9200"
xpack.management.logstash.poll_interval: 15s

I am also looking for more than the Pipeline to Pipeline page in the documents. There is no information on how or even if Pipeline to Pipeline is compatible with Centralized Pipeline management.
Does anyone have information on this?

Thanks

You might do better with an array membership test.

if "dev-toast" in [tags]

@Badger Updated the config and it seemed to send on through. I am still chasing a different issue from Filebeat, so all I got was tagged JSON failures. But, at least it is something.

Still need more info on Centralized Pipeline management and Pipeline to Pipeline. I am not sure I am configuring the Logstash Pipelines in the pipelines.yml and in the /etc/logstash/conf.d/ directory.

Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.