I am trying to run a small pool of logstash with at-most-once delivery semantics on a kafka 0.10.2.1 cluster. The kafka input client is running with the new style of kafka consumer group, managed as metadata in the brokers, and it does not use Zookeeper for offset tracking. I have a brief commit interval to keep the offsets moving between consumers.
input {
kafka {
bootstrap_servers => "${KAFKA_BOOTSTRAP_SERVERS}"
topics => ["proofpoint", "fortigate", "windows"]
group_id => "logstash"
auto_offset_reset => "latest"
auto_commit_interval_ms => "50"
codec => "json"
consumer_threads => 1
partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"
}
}
My elasticsearch output config:
output {
elasticsearch {
hosts => ["${ELASTICSEARCH_CLIENT_HOST}"]
index => "@%{vtype}-nad-v2-%{date}"
flush_size => 1000
idle_flush_time => 10
sniffing => true
sniffing_delay => 30
}
}
Now I see in Elasticsearch that I have 5x the data from what is in kafka. What am I doing wrong?