I am using kafka as a queue.
My data is getting into the kafka cluster just fine, and the majority of it is being output to my ES cluster just fine except for one topic.
My config for the logstash node consuming the messages from the kafka cluster is:
input { kafka { zk_connect => "10.1.84.3:2181,10.1.84.4:2181,10.1.84.4:2181" topic_id => "topbeat" } } output { stdout { codec => rubydebug } }
when I run this I get the following error:
log4j, [2016-03-05T14:39:29.025] WARN: kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [logstash_OP-01-VM-554-1457206763224-c9f936ab-leader-finder-thread], Failed to find leader for Set([topbeat,1], [topbeat,0]) kafka.common.KafkaException: fetching topic metadata for topics [Set(topbeat)] from broker [ArrayBuffer(id:0,host:10.1.84.9,port:9092, id:12,host:10.1.84.14,port:9092, id:6,host:10.1.84.8,port:9092, id:5,host:10.1.84.7,port:9092, id:10,host:10.1.84.12,port:9092, id:9,host:10.1.84.11,port:9092, id:2,host:10.1.84.4,port:9092, id:4,host:10.1.84.6,port:9092, id:11,host:10.1.84.13,port:9092, id:3,host:10.1.84.5,port:9092)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31) at kafka.producer.SyncProducer.send(SyncProducer.scala:114) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) ... 3 more
The strangest part is I have two other topics that are being consumed just fine, it is just this one topic. Any assistance would be appreciated.
Thanks