We are using logstash version : logstash-2.3.3
Right now we are using kafka input to read messages from broker which is version 0.9
{
kafka {
topic_id => "{{kafka_log_consumer_topic_id}}"
group_id => "{{kafka_consumer_group_id}}"
zk_connect => "{{zk_consumer_connect_url}}"
}
}
filter{
if [level] == "ERROR" or [level] == "WARN" or [level] == "FATAL" {
mutate{
add_field => {
"thrownMessage" => "%{[thrown][message]}"
"applicationName" => "%{[contextMap][applicationName]}"
"errorId" => "%{[contextMap][errorId]}"
}
}
uuid{
target => "logEventId"
}
}
}
output {
if [level] == "ERROR" or [level] == "WARN" or [level] == "FATAL" {
elasticsearch {
hosts => "{{elastic_search_hosts}}"
}
kafka {
topic_id => "{{kafka_error_publish_topic}}"
bootstrap_servers => "{{kafka_error_publish_host}}"
}
}
else {
elasticsearch {
hosts => "{{elastic_search_hosts}}"
}
}
}
Sometimes when our zookeeper goes down , and gets back up after sometime logstash is not reconnecting back again.
Does logstash kafka input supports using the new kafka consumer API 's which is not dependent on zookeeper ?