Logstash (1.5.3) is not able to connect to kafka


#1

Hi,

I am trying to read some log messages from kafka. Below is my logstash config:-

input {
    kafka {
        zk_connect => "kafka:2181"
        group_id => "logstash"
        topic_id => "logstash_logs"
        reset_beginning => false
        consumer_threads => 1
    }
}

output {
  elasticsearch {
    index => "test-2015-08-18"
  }
}

My zookeeper.properties in kafka look like below:-

dataDir=/tmp/kafka/
clientPort=2181
maxClientCnxns=0

The topic logstash_logs was created with the below command in kafka machine:-

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logstash_logs

On running logstash I am getting the below error in console:-

log4j, [2015-08-19T10:41:42.315]  WARN: kafka.client.ClientUtils$: Fetching topic metadata with correlation id 5 for topics [Set(logstash_logs)] from broker [id:0,host:localhost,port:9092] failed
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
log4j, [2015-08-19T10:41:42.315]  WARN: kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [logstash_logstash-indexer-1439961100662-86d0d679-leader-finder-thread], Failed to find leader for Set([logstash_logs,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(logstash_logs)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    ... 3 more

(Joe Lawson) #2

Try changing zk_connect => "kafka:2181" to zk_connect => "localhost:2181"


#3

Kafka/zookeeper not running in the same machine as the logstash. It is running in a different macine. Also I am able to telnet to kafka on port 2181 and also I am able to connect to kafka from the kafka web console. So, I don't think there is any network connectivity issue between both the machines.


#5

Setting advertised.host.name=kafka in server.properties file solved the issue.


(Joe Lawson) #6

Ah yes, that is a classic gotcha. Sorry I didn't catch it before.


#7

Thanks Joe :slight_smile:


(shyam) #8

Hi,
I am having similar problem. where my producer runs on AWS machine and unable to post the messages on topic.

i set up the advertised.host.name=mariner.twin.com
where mariner.twin.com is my kafka cluster public host name.

What else need to be set it up ?
Please help me.

thank you,


(Christian Dahlqvist) #9

Given that this thread is 5 months old, it is probably better if you create your own thread.


(system) #10