I am using kafka as a queue.
My data is getting into the kafka cluster just fine, and the majority of it is being output to my ES cluster just fine except for one topic.
My config for the logstash node consuming the messages from the kafka cluster is:
input {
kafka {
zk_connect => "10.1.84.3:2181,10.1.84.4:2181,10.1.84.4:2181"
topic_id => "topbeat"
}
}
output {
stdout { codec => rubydebug }
}
when I run this I get the following error:
log4j, [2016-03-05T14:39:29.025] WARN: kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [logstash_OP-01-VM-554-1457206763224-c9f936ab-leader-finder-thread], Failed to find leader for Set([topbeat,1], [topbeat,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(topbeat)] from broker [ArrayBuffer(id:0,host:10.1.84.9,port:9092, id:12,host:10.1.84.14,port:9092, id:6,host:10.1.84.8,port:9092, id:5,host:10.1.84.7,port:9092, id:10,host:10.1.84.12,port:9092, id:9,host:10.1.84.11,port:9092, id:2,host:10.1.84.4,port:9092, id:4,host:10.1.84.6,port:9092, id:11,host:10.1.84.13,port:9092, id:3,host:10.1.84.5,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
at kafka.producer.SyncProducer.send(SyncProducer.scala:114)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 3 more
The strangest part is I have two other topics that are being consumed just fine, it is just this one topic. Any assistance would be appreciated.
Thanks
You are right. The best solution is horizontally scaling but, if you have a 4 logstash instance you could add setting to your kafka input: