Hello all, I am trying to implement multiple pipelines, but it appears the output of one is being sent to two indices; its own and the other pipelines.
$logstash --version
Using bundled JDK: /usr/share/logstash/jdk
logstash 7.16.2
Helm chart configuration snippet:
logstashConfig:
logstash.yml: |
http.host: 0.0.0.0
monitoring.elasticsearch.hosts: http://elasticsearch-master.efk.svc:9200
pipeline.yml: |
- pipeline.id: appBeats
path.config: "/usr/share/logstash/pipeline/logstash.conf"
- pipeline.id: appKafka
path.config: "/usr/share/logstash/pipeline/app-kafka.conf"
# Allows you to add any pipeline files in /usr/share/logstash/pipeline/
### ***warn*** there is a hardcoded logstash.conf in the image, override it first
logstashPipeline:
logstash.conf: |-
{{ .Files.Get "pipelines/app-beats.conf" }}
app-kafka.conf: |-
{{ .Files.Get "pipelines/app-kafka.conf"}}
Executing into the Logstash pod and verifying pipelines.yml
and pipelines.yml
:
bash-4.2$ more config/pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline"
bash-4.2$ more config/pipeline.yml
- pipeline.id: appBeats
path.config: "/usr/share/logstash/pipeline/logstash.conf"
- pipeline.id: appKafka
path.config: "/usr/share/logstash/pipeline/app-kafka.conf"
For some reason inputs from pipeline appBeats
(which overwrites the default logstash.conf
) are being written to the output of the appKafka
.
Note while both have the same output host, the index is completely different. Not sure if I am missing something here, but the expectation would be that these pipelines are entirely isolated.
Pipeline conf examples:
input {
beats {
port => 5045
codec => json
}
filter{}
output {
elasticsearch {
hosts => ["${OUTPUT_HOST}"]
user => "${ELASTIC_USER}"
password => "${ELASTIC_PASS}"
index => "app-%{[fields][index_name]}-%{+yyyy.MM.dd}"
}
}
vs.
input {
kafka {
bootstrap_servers => "kafka-headless.kafka.svc.cluster.local:9092"
topics => ["distojson"]
client_id => "distojson-logstash"
group_id => "distojson-logstash"
decorate_events => basic
codec => "json"
max_partition_fetch_bytes => "1500000"
}
}
filter {
}
output {
elasticsearch {
hosts => ["${OUTPUT_HOST}"]
user => "${ELASTIC_USER}"
password => "${ELASTIC_PASS}"
index => "distojson-%{+yyyy.MM.dd}"
}
}