Hello all, I am trying to implement multiple pipelines, but it appears the output of one is being sent to two indices; its own and the other pipelines.
$logstash --version
Using bundled JDK: /usr/share/logstash/jdk
logstash 7.16.2
Helm chart configuration snippet:
logstashConfig:
logstash.yml: |
http.host: 0.0.0.0
monitoring.elasticsearch.hosts: http://elasticsearch-master.efk.svc:9200
pipeline.yml: |
- pipeline.id: appBeats
path.config: "/usr/share/logstash/pipeline/logstash.conf"
- pipeline.id: appKafka
path.config: "/usr/share/logstash/pipeline/app-kafka.conf"
# Allows you to add any pipeline files in /usr/share/logstash/pipeline/
### ***warn*** there is a hardcoded logstash.conf in the image, override it first
logstashPipeline:
logstash.conf: |-
{{ .Files.Get "pipelines/app-beats.conf" }}
app-kafka.conf: |-
{{ .Files.Get "pipelines/app-kafka.conf"}}
Executing into the Logstash pod and verifying pipelines.yml and pipelines.yml:
bash-4.2$ more config/pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline"
bash-4.2$ more config/pipeline.yml
- pipeline.id: appBeats
path.config: "/usr/share/logstash/pipeline/logstash.conf"
- pipeline.id: appKafka
path.config: "/usr/share/logstash/pipeline/app-kafka.conf"
For some reason inputs from pipeline appBeats (which overwrites the default logstash.conf) are being written to the output of the appKafka.
Note while both have the same output host, the index is completely different. Not sure if I am missing something here, but the expectation would be that these pipelines are entirely isolated.
Pipeline conf examples:
input {
beats {
port => 5045
codec => json
}
filter{}
output {
elasticsearch {
hosts => ["${OUTPUT_HOST}"]
user => "${ELASTIC_USER}"
password => "${ELASTIC_PASS}"
index => "app-%{[fields][index_name]}-%{+yyyy.MM.dd}"
}
}
vs.
input {
kafka {
bootstrap_servers => "kafka-headless.kafka.svc.cluster.local:9092"
topics => ["distojson"]
client_id => "distojson-logstash"
group_id => "distojson-logstash"
decorate_events => basic
codec => "json"
max_partition_fetch_bytes => "1500000"
}
}
filter {
}
output {
elasticsearch {
hosts => ["${OUTPUT_HOST}"]
user => "${ELASTIC_USER}"
password => "${ELASTIC_PASS}"
index => "distojson-%{+yyyy.MM.dd}"
}
}