kafka 0 partition metadata cannot be read in logstash6.8, other partitions can. Sample configuration:
input {
kafka {
client_id => "ycUsrRdNews"
consumer_threads => 4
bootstrap_servers => "${KAFKA_BOOTSTRAP_SERVERS}"
topics => "ycUsrRdNews"
group_id => "ycUsrRdNewsConsumerGroup"
auto_offset_reset => "latest"
decorate_events => true
type => "ycUsrRdNews"
}
kafka {
client_id => "ycUsrRdNews2"
consumer_threads => 4
bootstrap_servers => "${KAFKA_BOOTSTRAP_SERVERS_2}"
topics => "ycUsrRdNews"
group_id => "ycUsrRdNewsConsumerGroup2"
auto_offset_reset => "latest"
decorate_events => true
type => "ycUsrRdNews"
}
}
filter {
if [type] == "ycUsrRdNews" {
json {
source => "message"
skip_on_invalid_json => true
add_field => {
"kafkaInfo" => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][consumer_group]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}-%{[@metadata][kafka][timestamp]}"
}
}
}
}
output {
if [type] == "ycUsrRdNews" {
#stdout { codec => rubydebug }
elasticsearch {
hosts => ["${ELASTICSEARCH_HOSTS}"]
index => "ycusrrd-news%{+YYYYMMdd}"
user => "${ELASTICSEARCH_APP_LOG_USER}"
password => "${ELASTICSEARCH_PASSWORD}"
}
if [level] == "ERROR" {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOSTS}"]
index => "ycusrrderr-news%{+YYYYMM}"
user => "${ELASTICSEARCH_APP_LOG_USER}"
password => "${ELASTICSEARCH_PASSWORD}"
}
}
}
}
You need to provide more context and share some logs that would indicate any issue.
It is not clear what is your issue here.