Strange kafka output plugin UNKNOWN_TOPIC_OR_PARTITION


(Luca Malossi) #1

Hi All,
I've logstash configured to publish to 3 kafka topics metrics and apache and weblogic logs.
If I use 2 of the three topics everything is well.. As soon as I start to use the third one I get:

[2019-03-15T18:22:12,194][WARN ][org.apache.kafka.clients.NetworkClient] [Producer clientId=kafka-weblogic] Error while fetching metadata with correlation id 1078 : {epo_Weblogic_Log=UNKNOWN_TOPIC_OR_PARTITION}

Any Ideas?

here is my pipeline configuration:

input {
  tcp {
    port => 514
    type => syslog
  }
  udp {
    port => 514
    type => syslog
  }

  beats {
    port => 5044
    type => "beats"
  }
}

filter {
  if [type] == "syslog" {
    drop { }
  }
}

output {
  #stdout { codec => rubydebug }


  if [@metadata][beat] == "filebeat" {
    elasticsearch {
      hosts => [ "elastic-ls-as2-pr.services.eni.intranet:9200" ]
      index => ["%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"]
    }
    if [fields][layer] == "apache" {
      kafka {
        id => "kafka-apache"
        topic_id => "epo_Apache_Log"
        bootstrap_servers => "02srv00ek2.ad02.eni.intranet:9092, 02srv00ek3.ad02.eni.intranet:9092, 02srv00ek4.ad02.eni.intranet:9092"
        compression_type => "none"
        jaas_path => "/etc/logstash/conf.d/kafka-conf/jaas.conf"
        kerberos_config => "/etc/logstash/conf.d/kafka-conf/krb5.conf"
        sasl_kerberos_service_name => "kafka"
        security_protocol => "SASL_PLAINTEXT"
        client_id => "kafka-apache"
      }
    }
    if [fields][layer] == "weblogic" {
      kafka {
        id => "kafka-weblogic"
        topic_id => "epo_Weblogic_Log"
        bootstrap_servers => "02srv00ek2.ad02.eni.intranet:9092, 02srv00ek3.ad02.eni.intranet:9092, 02srv00ek4.ad02.eni.intranet:9092"
        compression_type => "none"
        jaas_path => "/etc/logstash/conf.d/kafka-conf/jaas.conf"
        kerberos_config => "/etc/logstash/conf.d/kafka-conf/krb5.conf"
        sasl_kerberos_service_name => "kafka"
        security_protocol => "SASL_PLAINTEXT"
        client_id => "kafka-weblogic"
      }
    }
  } 
  else if [@metadata][beat] == "metricbeat" {
    elasticsearch {
      hosts => [ "elastic-ls-as2-pr.services.eni.intranet:9200" ]
      index => ["%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"]
    }
    kafka {
      id => "kafka-metrics"
      topic_id => "epo_Metrics_Log"
      bootstrap_servers => "02srv00ek2.ad02.eni.intranet:9092, 02srv00ek3.ad02.eni.intranet:9092, 02srv00ek4.ad02.eni.intranet:9092"
      compression_type => "none"
      jaas_path => "/etc/logstash/conf.d/kafka-conf/jaas.conf"
      kerberos_config => "/etc/logstash/conf.d/kafka-conf/krb5.conf"
      sasl_kerberos_service_name => "kafka"
      security_protocol => "SASL_PLAINTEXT"
      client_id => "kafka-metrics"
    }
  }
}

#2

Do you have evidence that this topic exists?