Filebeat Kafka module with autodiscover

Hi,
Im trying to ship logs from Kafka cluster that hosts in Kubernetes cluster with Kafka module to Elasticsearch throw log-stash , I tried to configure my filebeat.yaml and logstash pipeline like the documentation ( we are using auto discover) , and Im still not able to see the new index and dashboard for Kafka in my Kibana .

We are running filebeat 7.10

filebeat.yaml configured as follows :

  filebeat.yml: |-
    filebeat.config:
    processors:
    - drop_fields:
        fields: ["agent.ephemeral_id", "agent.hostname","agent.name", "agent.id", "agent.type", "agent.version", 
        "ecs.version", "input.type","suricata.eve.timestamp","kubernetes.pod.uid","log.file.path","log.offset","kubernetes.labels.app_kubernetes_io/part-of","kubernetes.labels.pod-template-hash","kubernetes.labels.app"]
    filebeat.autodiscover:
      providers:
         - type: kubernetes
           node: ${NODE_NAME}
           hints.enabled: false
           templates:
             - condition:
                 equals:
                   kubernetes.labels.app: "web"
               config:
                 - type: container
                   paths:
                     - /var/log/containers/*-${data.kubernetes.container.id}.log
                   exclude_lines: ["^\\s+[\\-`('.|_]"]              
        - type: docker
          templates:
            - condition:
                contains:
                  docker.container.image: kafka
              config:
                - module: kafka
                  log:
                    input:
                      type: container
                      paths:
                        - /var/lib/docker/containers/${data.docker.container.id}/*.log

    logging:
      level: debug
    output.logstash:
      hosts: ["logstash.prod:30202"]
    http:
      enabled: true
      port: 5066
      host: 0.0.0.0
binaryData: {}

logstash.yaml configured as follows :

  logstash.conf: |
    input {
      beats {
        port => 5044
        client_inactivity_timeout => 7200
      }
      http { port => 8080 }
    }
    filter {
        json {
            source => "message"
            ecs_compatibility => "disabled"
        }
        if ![severity] and [level] {
            mutate {
                replace => { "severity" => "%{level}" }
            }
        }

        mutate {
            lowercase => [ "severity" ]
        }

        if ![service] and [kubernetes][container][name] {
            mutate {
                add_field => { "service" => "%{[kubernetes][container][name]}" }
            }
        } 
    }
    output {
      if [@metadata][pipeline] {
        elasticsearch {
          hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
          manage_template => false
          index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
          pipeline => "%{[@metadata][pipeline]}" 
        }
      } else {
        elasticsearch {
          hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
          manage_template => false
          template_overwrite => false
          template_name => "logstash-logs"
          index => "logs-%{+yyyy.MM.dd}"
          }
      }
    }

Is there a something I'm missing?

Thanks.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.