Hi,
I am trying to read some log messages from kafka. Below is my logstash config:-
input {
kafka {
zk_connect => "kafka:2181"
group_id => "logstash"
topic_id => "logstash_logs"
reset_beginning => false
consumer_threads => 1
}
}
output {
elasticsearch {
index => "test-2015-08-18"
}
}
My zookeeper.properties in kafka look like below:-
dataDir=/tmp/kafka/
clientPort=2181
maxClientCnxns=0
The topic logstash_logs
was created with the below command in kafka machine:-
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logstash_logs
On running logstash I am getting the below error in console:-
log4j, [2015-08-19T10:41:42.315] WARN: kafka.client.ClientUtils$: Fetching topic metadata with correlation id 5 for topics [Set(logstash_logs)] from broker [id:0,host:localhost,port:9092] failed
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
log4j, [2015-08-19T10:41:42.315] WARN: kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [logstash_logstash-indexer-1439961100662-86d0d679-leader-finder-thread], Failed to find leader for Set([logstash_logs,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(logstash_logs)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 3 more