Hi All,
I would like to get message from kafka but get ClosedChannelException (loops)
log4j, [2015-10-01T15:44:28.569] WARN: kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [logstash_logstashsubkafka-1443685462254-3d51c80d-leader-finder-thread], Failed to find leader for Set([neptunus,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(neptunus)] from broker [ArrayBuffer(id:1,host:kafkahost,port:9093)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 3 more
Kafka
./kafka-console-consumer.sh --zookeeper zookeeper4kafka:2181 --topic neptunus --from-beginning
{"message":"helloWorld!!","@version":"1","@timestamp":"2015-10-01T04:14:36.579Z","host":"logstashpubkafka","path":"/var/log/kafka/test","tst"}
{"message":"Test World","@version":"1","@timestamp":"2015-10-01T04:20:34.966Z","host":"logstashpubkafka","path":"/var/log/kafka/test","typ"}
{"message":"helloWorld!!","@version":"1","@timestamp":"2015-10-01T04:20:34.968Z","host":"logstashpubkafka","path":"/var/log/kafka/test","tst"}
{"message":"today is my day","@version":"1","@timestamp":"2015-10-01T05:42:04.827Z","host":"logstashpubkafka","path":"/var/log/kafka/test""test"}
Logstash (subscribe)
input {
kafka {
topic_id => "neptunus"
zk_connect => "zookeeper4kafka:2181"
type => "test"
}
}
output {
stdout { codec => rubydebug }
}
Any helps
Jason