Kafka 0 partition metadata cannot be read in logstash6.8, other partitions can. Sample configuration:

kafka 0 partition metadata cannot be read in logstash6.8, other partitions can. Sample configuration:

input {
	kafka {
		client_id => "ycUsrRdNews"
		consumer_threads => 4
		bootstrap_servers => "${KAFKA_BOOTSTRAP_SERVERS}"
		topics => "ycUsrRdNews"
		group_id => "ycUsrRdNewsConsumerGroup"
		auto_offset_reset => "latest"
		decorate_events => true
		type => "ycUsrRdNews"
	}
    kafka {
        client_id => "ycUsrRdNews2"
        consumer_threads => 4
        bootstrap_servers => "${KAFKA_BOOTSTRAP_SERVERS_2}"
        topics => "ycUsrRdNews"
        group_id => "ycUsrRdNewsConsumerGroup2"
        auto_offset_reset => "latest"
        decorate_events => true
        type => "ycUsrRdNews"
    }
}

filter {
    if [type] == "ycUsrRdNews" {
    	json {
    	  source => "message"
    	  skip_on_invalid_json => true
          add_field => {
            "kafkaInfo" => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][consumer_group]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}-%{[@metadata][kafka][timestamp]}"
          }
    	}
    }
}

output {
    if [type] == "ycUsrRdNews" {
        #stdout { codec => rubydebug }
        elasticsearch {
            hosts => ["${ELASTICSEARCH_HOSTS}"]
            index => "ycusrrd-news%{+YYYYMMdd}"
            user => "${ELASTICSEARCH_APP_LOG_USER}"
            password => "${ELASTICSEARCH_PASSWORD}"
        }

        if [level] == "ERROR" {
            elasticsearch {
                hosts => ["${ELASTICSEARCH_HOSTS}"]
                index => "ycusrrderr-news%{+YYYYMM}"
                user => "${ELASTICSEARCH_APP_LOG_USER}"
                password => "${ELASTICSEARCH_PASSWORD}"
            }
        }
    }
}

You need to provide more context and share some logs that would indicate any issue.

It is not clear what is your issue here.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.