Logstash subscribe Kafka message gets ClosedChannelException


(Jason Zheng) #1

Hi All,

I would like to get message from kafka but get ClosedChannelException (loops)

log4j, [2015-10-01T15:44:28.569]  WARN: kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [logstash_logstashsubkafka-1443685462254-3d51c80d-leader-finder-thread], Failed to find leader for Set([neptunus,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(neptunus)] from broker [ArrayBuffer(id:1,host:kafkahost,port:9093)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        ... 3 more

Kafka

    ./kafka-console-consumer.sh --zookeeper zookeeper4kafka:2181 --topic neptunus --from-beginning
    {"message":"helloWorld!!","@version":"1","@timestamp":"2015-10-01T04:14:36.579Z","host":"logstashpubkafka","path":"/var/log/kafka/test","tst"}
    {"message":"Test World","@version":"1","@timestamp":"2015-10-01T04:20:34.966Z","host":"logstashpubkafka","path":"/var/log/kafka/test","typ"}
    {"message":"helloWorld!!","@version":"1","@timestamp":"2015-10-01T04:20:34.968Z","host":"logstashpubkafka","path":"/var/log/kafka/test","tst"}
    {"message":"today is my day","@version":"1","@timestamp":"2015-10-01T05:42:04.827Z","host":"logstashpubkafka","path":"/var/log/kafka/test""test"}

Logstash (subscribe)

input {
        kafka {
                topic_id => "neptunus"
                zk_connect => "zookeeper4kafka:2181"
                type => "test"
        }
}

output {
        stdout { codec => rubydebug }
}

Any helps

Jason


(Joe Lawson) #2

Make sure the Kafka brokers have their host.name property set to something
resolvable by the consumer.


(Jason Zheng) #3

Hi Joe,

Thanks for replying, I solved the problem by your recommended

Jason


(system) #4