I'm trying to consume a Kafka topic using Logstash, for indexing by Elasticsearch. The Kafka events are JSON documents.
We recently upgraded our Elastic Stack to 5.1.2.
I believe that I was able to consume the topic OK in 5.0, using the same settings, but that was a while ago so perhaps I'm doing something wrong now, but can't see it. This is my config (slightly sanitized):
input {
kafka {
bootstrap_servers => "host1:9092,host2:9092,host3:9092"
client_id => "logstash-elastic-5-c5"
group_id => "logstash-elastic-5-g5"
topics => "trp_v1"
auto_offset_reset => "earliest"
# enable_auto_commit => "false"
}
}
filter {
json {
source => "message"
}
mutate {
rename => { "@timestamp" => "indexedDatetime" }
remove_field => [
"@timestamp",
"@version",
"message"
]
}
}
output {
##stdout { codec => plain }
stdout { codec => rubydebug }
elasticsearch {
hosts => ["host10:9200", "host11:9200", "host12:9200", "host13:9200"]
action => "index"
index => "trp-i"
document_type => "event"
}
}
When I run this, no messages are consumed, no sign of activity appears in the log after "[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions", and in Kafka Manager the consumer appears to immediately appear with "total lag = 0" for the topic.
This version of the Kafka plugin stores consumer offsets in Kafka itself, so each time I try to run Logstash against the same topic, I increment the group_id so in theory, it should start from the earliest offset for the topic.
Any advice?
EDIT: It appears that despite setting auto_offset_reset to "earliest", it isn't working - it's as if it's being set to "latest". I left Logstash running, then had more entries loaded into the Kafka queue, and they were processed by Logstash. Are my settings wrong in some way? Or could this be a bug?