Hi, I have deployed ECK on Kubernetes and now I want to use fluentd to collect logs from other applications on Kubernetes and Kafka topic, it collected logs from the other application I want but not from Kafka topic. This is my fluentd configuration:
apiVersion: v1
data:
fluent.conf: |
<label @FLUENT_LOG>
<match fluent.**>
@type null
</match>
</label>
<match kubernetes.var.log.containers.**kube-system**.log>
@type null
</match>
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/app.log.pos
tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
read_from_head true
<parse>
@type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<source>
@type kafka
brokers my-cluster-kafka-bootstrap.kafka:9092
<topic>
topic event-log
</topic>
format json
tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
read_from_head true
<parse>
@type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
<filter kubernetes.**>
@type grep
<exclude>
key log
pattern (.\[notice]\.*|^[ \\\/\(\)\*\|_]+(?!.*[a-zA-Z0-9]).*$|^\s*$|.*GET*|.*POST*)
</exclude>
<exclude>
key $.kubernetes.namespace_name
pattern ^(?!^(default|ingress-nginx-ci|kafka)$).*
</exclude>
<exclude>
key $.kubernetes.container_name
pattern ^(?!^(utms-live-backend|client-interface|rm|rmc|utms-da-report-frontend|utms-live-frontend|utms-app|controller|sidecar-container|utms-da-report-backend)$).*
</exclude>
</filter>
<match kubernetes.**>
@type rewrite_tag_filter
<rule>
key $.kubernetes.namespace_name
pattern ^(.+)$
tag $1
</rule>
</match>
<match **>
@type elasticsearch
@log_level info
include_tag_key true
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
user "#{ENV['FLUENT_ELASTICSEARCH_USER']}"
password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
reload_connections "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_CONNECTIONS'] || 'false'}"
reconnect_on_error "#{ENV['FLUENT_ELASTICSEARCH_RECONNECT_ON_ERROR'] || 'true'}"
reload_on_failure "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_ON_FAILURE'] || 'true'}"
sniffer_class_name "#{ENV['FLUENT_SNIFFER_CLASS_NAME'] || 'Fluent::Plugin::ElasticsearchSimpleSniffer'}"
logstash_format true
logstash_prefix "${tag}"
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 8
flush_interval 5s
retry_forever true
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 32
overflow_action block
</buffer>
</match>
kind: ConfigMap
metadata:
name: fluentd-config
namespace: elastic-system
What am I doing wrong?