Hello,
I have been looking for more in depth information on the Pipeline to Pipeline functionality in Logstash, specifically version 6.4.3. I know it is still in beta, but I have been trying to get it to work in a dev environment.
I have configured a distributor pattern in my pipelines.yml. I can see events come into the first pipeline from Beats, but no events seem to be emitted through the virtual addresses to the other pipelines. I have verified that the pipeline addresses are correct, and I know my Logstash server is working as other pipelines for Metricbeat are receiving and emitting events.
I am also using Centralized Pipeline Management, and that seems to be working as well.
Here are my configs:
Logstash 6.4.3
RHEL 7
pipelines.yml
- pipeline.id: main
path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: dev-filebeat-sort
config.string: |
input {
beats {
client_inactivity_timeout => 1200
port => 5044
}
}
output {
if [tags] == "registry_json" {
pipeline { send to => "dev-virt" }
} else if [tags] == "dev-lexmark-printer" {
pipeline { send to => "dev-lexmark_printer-virt" }
} else if [tags] == "dev-btr" {
pipeline { send to => "dev-btr-virt" }
} else if [tags] == "dev-toast" {
pipeline { send to => "dev-toast-virt" }
}
}
- pipeline.id: dev-registry
config.string: |
input { pipeline { address => "dev-virt" } }
filter {
_proprietary stuff here..._
}
output {
elasticsearch {
hosts => ["hosthere:9200"]
user => "user"
password => "secret"
manage_template => false
index => "dev-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
- pipeline.id: dev-lexmark_printer-test
config.string: |
input { pipeline { address => "dev-lexmark_printer-virt" } }
filter {
if [tags] == "dev-lexmark-printer" {
json {
id => "0001"
source => "message"
tag_on_failure => ["_dev_lexmark_printer_jsonparsefailure"]
}
}
}
output {
elasticsearch {
hosts => ["hosthere:9200"]
user => "user"
password => "secret"
manage_template => false
index => "dev-lexmark-printer-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
- pipeline.id: dev-btr-test
config.string: |
input { pipeline { address=> "dev-btr-virt" } }
filter {
if [tags] == "dev-btr" {
json {
id => "0001"
source => "message"
tag_on_failure => ["_btr_jsonparsefailure"]
}
}
}
output {
elasticsearch {
hosts => ["hosthere:9200"]
user => "user"
password => "secret"
manage_template => false
index => "dev-btr-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
- pipeline.id: dev-toast-test
config.string: |
input { pipeline { address => "dev-toast-virt" } }
filter {
if [tags] == "dev-toast" {
json {
id => "0001"
source => "message"
tag_on_failure => ["_dev_toast_jsonparsefailure"]
}
}
}
output {
elasticsearch {
hosts => ["hosthere:9200"]
user => "user"
password => "secret"
manage_template => false
index => "dev-toast-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
Logstash.yml
path.data: /var/lib/logstash config.reload.automatic: true config.reload.interval: 10s http.host: "iphere" http.port: 9600-9700 log.level: info path.logs: /var/log/logstash xpack.monitoring.enabled: true xpack.monitoring.elasticsearch.username: user xpack.monitoring.elasticsearch.password: "secret" xpack.monitoring.elasticsearch.url: "http://iphere:9200" xpack.monitoring.collection.interval: 10s xpack.monitoring.collection.pipeline.details.enabled: true xpack.management.enabled: true xpack.management.pipeline.id: ["main", "dev-filebeat-sort", "dev-registry", "dev-lexmark_printer-test", "dev-btr-test", "dev-toast-test", "dev-me$ xpack.management.elasticsearch.username: user xpack.management.elasticsearch.password: "secret" xpack.management.elasticsearch.url: "http://iphere:9200" xpack.management.logstash.poll_interval: 15s
I am also looking for more than the Pipeline to Pipeline page in the documents. There is no information on how or even if Pipeline to Pipeline is compatible with Centralized Pipeline management.
Does anyone have information on this?
Thanks