kafka 0 partition metadata cannot be read in logstash6.8, other partitions can. Sample configuration:
input {
kafka {
client_id => "ycUsrRdNews"
consumer_threads => 4
bootstrap_servers => "${KAFKA_BOOTSTRAP_SERVERS}"
topics => "ycUsrRdNews"
group_id => "ycUsrRdNewsConsumerGroup"
auto_offset_reset => "latest"
decorate_events => true
type => "ycUsrRdNews"
}
kafka {
client_id => "ycUsrRdNews2"
consumer_threads => 4
bootstrap_servers => "${KAFKA_BOOTSTRAP_SERVERS_2}"
topics => "ycUsrRdNews"
group_id => "ycUsrRdNewsConsumerGroup2"
auto_offset_reset => "latest"
decorate_events => true
type => "ycUsrRdNews"
}
}
filter {
if [type] == "ycUsrRdNews" {
json {
source => "message"
skip_on_invalid_json => true
add_field => {
"kafkaInfo" => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][consumer_group]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}-%{[@metadata][kafka][timestamp]}"
}
}
}
}
output {
if [type] == "ycUsrRdNews" {
#stdout { codec => rubydebug }
elasticsearch {
hosts => ["${ELASTICSEARCH_HOSTS}"]
index => "ycusrrd-news%{+YYYYMMdd}"
user => "${ELASTICSEARCH_APP_LOG_USER}"
password => "${ELASTICSEARCH_PASSWORD}"
}
if [level] == "ERROR" {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOSTS}"]
index => "ycusrrderr-news%{+YYYYMM}"
user => "${ELASTICSEARCH_APP_LOG_USER}"
password => "${ELASTICSEARCH_PASSWORD}"
}
}
}
}
You need to provide more context and share some logs that would indicate any issue.
It is not clear what is your issue here.
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.