I'm running logstash 6.6.0 with kafka input in a docker image. However it is not processing any events while the consumer group offsets are updated to the latest in kafka. Even when I manually reset the offsets to the beginning.
I've tried multiple configurations and tried to find answers online. But there was nothing that could solve the issues I'm experiencing.
Please see logging and configuration below. As you can see the offset for the consumer starts at 0. But after launching logstash there is no more lag. The stdout output didn't show any of the messages unfortunately. Any help is most welcome!
Consumer groups before launching Logstash
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
resource-infrastructure-staging 2 0 81 81 - - -
resource-infrastructure-staging 0 0 63 63 - - -
resource-infrastructure-staging 1 0 63 63 - - -
Latest version of logstash config
input {
kafka {
type => "eventdata"
id => "kafka-2"
bootstrap_servers => "<host>:9092"
client_id => "logstash"
group_id => "logstash"
auto_offset_reset => "earliest"
enable_auto_commit => "false"
topics => ["resource-infrastructure-staging"]
codec => "json"
consumer_threads => 1
max_poll_records => "10"
}
}
filter { }
output {
if [type] == "eventdata" {
stdout { }
elasticsearch {
hosts => [ "<host>:9200" ]
index => "events-%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][consumer_group]}-%{+YYYY.MM.dd}"
}
}
}
Consumer groups after launching logstash
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
resource-infrastructure-staging 0 63 63 0 logstash-0-4ede6b58-950b-4dcf-9587-a8937e55b8cf /<host> logstash-0
resource-infrastructure-staging 1 63 63 0 logstash-0-4ede6b58-950b-4dcf-9587-a8937e55b8cf /<host> logstash-0
resource-infrastructure-staging 2 81 81 0 logstash-0-4ede6b58-950b-4dcf-9587-a8937e55b8cf /<host> logstash-0
All Logstash Kafka logging
[2019-02-12T16:58:46,713][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
[2019-02-12T16:58:46,713][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
[2019-02-12T16:58:47,381][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: W-4kqDnHRI6-jvwlXuRmjA
[2019-02-12T16:58:47,385][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Discovered group coordinator 10.11.33.203:9092 (id: 2147483646 rack: null)
[2019-02-12T16:58:47,396][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Revoking previously assigned partitions []
[2019-02-12T16:58:47,400][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] (Re-)joining group
[2019-02-12T16:58:50,472][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Successfully joined group with generation 23
[2019-02-12T16:58:50,475][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [resource-infrastructure-staging-0, resource-infrastructure-staging-1, resource-infrastructure-staging-2]