Kafka.consumer.RangeAssignor: No broker partitions consumed by consumer thread logstash_logstash-indexer


#1

I have an ELK Set-up in which a logstash is pushing data to Kafka and another logstash is pulling data from Kafka.

Below are my Kafka Input Config:-

input {
    kafka {
        zk_connect => "kafka:2181"
        group_id => "logstash"
        topic_id => "logstash_logs"
        reset_beginning => false
        consumer_threads => 3
    }
}

I have gone through this issue & I have 3 partitions for my logstash topic.

After starting the logstash I am seeing the below warnings and logstash is not pulling any data from Kafka:-

'[DEPRECATED] use `require 'concurrent'` instead of `require 'concurrent_ruby'`
log4j, [2015-10-27T00:11:50.471]  WARN: kafka.consumer.RangeAssignor: No broker partitions consumed by consumer thread logstash_logstash-indexer-1445884909915-d6a99924-2 for topic logstash_logs
log4j, [2015-10-27T00:11:50.471]  WARN: kafka.consumer.RangeAssignor: No broker partitions consumed by consumer thread logstash_logstash-indexer-1445884909915-d6a99924-0 for topic logstash_logs
log4j, [2015-10-27T00:11:50.471]  WARN: kafka.consumer.RangeAssignor: No broker partitions consumed by consumer thread logstash_logstash-indexer-1445884909915-d6a99924-1 for topic logstash_logs
Logstash startup completed

Can someone let me know what else can go wrong?

Environment -

  • Logstash - 1.5.3
  • Kafka - 0.8.2.1

(Joe Lawson) #2

The Kafka consumer starts at the largest offset by default from when the
consumer group is created. If you want data that is older you have to
remove the consumer group and then set the offset default to smallest for
the initial pull. You can trigger it by starting one logstash-kafka-input
instance and use the
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-reset_beginning
setting.


#3

Thanks @Joe_Lawson. You meant to say I should restart the logstash instance by setting reset_beginning to true?


(Joe Lawson) #4

Yeah update you logstash config with reset_beginning true and it'll reset
the consumer group offsets in zookeeper so after if does that, stop the
logstash service and take it out. Otherwise it would reset it every time
which you don't want.


#5

@Joe_Lawson - As you suggested I set the reset_beginning => true started the logstash and when I saw the message Logstash startup Completed I stopped the logstash and then restarted it again with reset_beginning => false. It worked but after some time I observed that logstsash is not again pulling message from Kafka on restarting I saw the same WARNING message.

WARN: kafka.consumer.RangeAssignor: No broker partitions consumed by consumer thread logstash_logstash-indexer-1445884909915-d6a99924-2 for topic logstash_logs

Do you have any idea why is logstash suddenly stopping to pull message from Kafka?


(Joe Lawson) #6

The error you are reporting implies that there are less partitions than
threads. Using the kafka tools, run

bin/kafka-topics.sh --describe --topic logstash_logs --zookeeper kafka:2181

That should tell you how many partitions are on the topic. Also
set consumer_threads to 1. You don't gain anything setting it larger.


#7

@Joe_Lawson - I am having 3 partitions that I verified. Below is the output:-

root@kafka:/opt/kafka_2.10-0.8.2.1# bin/kafka-topics.sh --describe --topic logstash_logs --zookeeper localhost:2181
Topic:logstash_logs    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: logstash_logs    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: logstash_logs    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: logstash_logs    Partition: 2    Leader: 0    Replicas: 0    Isr: 0

As per the documentation no. of threads should be equal to the no. of partitions. When should we set this to more than 1 then?


#8

I created a new topic logstash_logs2 but seeing the same problem :frowning: . I started logstash with --verbose. Below are the logs:-

log4j, [2015-11-01T19:32:37.452]  INFO: org.apache.zookeeper.ClientCnxn: Session establishment complete on server kafka/172.16.84.188:2181, sessionid = 0x150a82d75a55c66, negotiated timeout = 6000
log4j, [2015-11-01T19:32:37.463]  INFO: org.I0Itec.zkclient.ZkClient: zookeeper state changed (SyncConnected)
log4j, [2015-11-01T19:32:37.473]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], starting auto committer every 1000 ms
log4j, [2015-11-01T19:32:37.511]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], begin registering consumer logstash_logstash-indexer-1446386557328-9606786c in ZK
log4j, [2015-11-01T19:32:37.584]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], end registering consumer logstash_logstash-indexer-1446386557328-9606786c in ZK
log4j, [2015-11-01T19:32:37.598]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], starting watcher executor thread for consumer logstash_logstash-indexer-1446386557328-9606786c
log4j, [2015-11-01T19:32:37.663]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], begin rebalancing consumer logstash_logstash-indexer-1446386557328-9606786c try #0
log4j, [2015-11-01T19:32:37.875]  INFO: kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1446386557468] Stopping leader finder thread
log4j, [2015-11-01T19:32:37.875]  INFO: kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1446386557468] Stopping all fetchers
log4j, [2015-11-01T19:32:37.878]  INFO: kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1446386557468] All connections stopped
log4j, [2015-11-01T19:32:37.878]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], Cleared all relevant queues for this fetcher
log4j, [2015-11-01T19:32:37.880]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], Cleared the data chunks in all the consumer message iterators
log4j, [2015-11-01T19:32:37.881]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], Committing all offsets after clearing the fetcher queues
log4j, [2015-11-01T19:32:37.883]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], Releasing partition ownership
log4j, [2015-11-01T19:32:37.958]  INFO: kafka.consumer.RangeAssignor: Consumer logstash_logstash-indexer-1446386557328-9606786c rebalancing the following partitions: ArrayBuffer(0, 1, 2) for topic logstash_logs2 with consumers: List(logstash_logstash-indexer-1446358311049-e6b11928-0, logstash_logstash-indexer-1446358311049-e6b11928-1, logstash_logstash-indexer-1446358311049-e6b11928-2, logstash_logstash-indexer-1446358341201-b911cc94-0, logstash_logstash-indexer-1446358341201-b911cc94-1, logstash_logstash-indexer-1446358341201-b911cc94-2, logstash_logstash-indexer-1446364131441-49f4f931-0, logstash_logstash-indexer-1446364131441-49f4f931-1, logstash_logstash-indexer-1446364131441-49f4f931-2, logstash_logstash-indexer-1446386557328-9606786c-0)
log4j, [2015-11-01T19:32:37.960]  WARN: kafka.consumer.RangeAssignor: No broker partitions consumed by consumer thread logstash_logstash-indexer-1446386557328-9606786c-0 for topic logstash_logs2
log4j, [2015-11-01T19:32:37.998]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], Consumer logstash_logstash-indexer-1446386557328-9606786c selected partitions : 
log4j, [2015-11-01T19:32:38.001]  INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_logstash-indexer-1446386557328-9606786c], end rebalancing consumer logstash_logstash-indexer-1446386557328-9606786c try #0
log4j, [2015-11-01T19:32:38.004]  INFO: kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [logstash_logstash-indexer-1446386557328-9606786c-leader-finder-thread], Starting 

I tried with consumer_thread => 1 but it does not help much.


(Joe Lawson) #9

How many logstash processes do you have connecting to those brokers that are using the same consumer group I'd?

From the following snippit it looks like you have four different logstash consumers hitting that topic all in the same group. Three consumers have three threads and one has a single thread. That makes 10 consumer threads for only three partitions so 7 are going to throw errors. For each consumer group you can only have as many consumer threads as there are partitions.

log4j, [2015-11-01T19:32:37.958]  INFO: kafka.consumer.RangeAssignor: Consumer logstash_logstash-indexer-1446386557328-9606786c rebalancing the following partitions: ArrayBuffer(0, 1, 2) for topic logstash_logs2 with consumers: List(logstash_logstash-indexer-1446358311049-e6b11928-0, logstash_logstash-indexer-1446358311049-e6b11928-1, logstash_logstash-indexer-1446358311049-e6b11928-2, logstash_logstash-indexer-1446358341201-b911cc94-0, logstash_logstash-indexer-1446358341201-b911cc94-1, logstash_logstash-indexer-1446358341201-b911cc94-2, logstash_logstash-indexer-1446364131441-49f4f931-0, logstash_logstash-indexer-1446364131441-49f4f931-1, logstash_logstash-indexer-1446364131441-49f4f931-2, logstash_logstash-indexer-1446386557328-9606786c-0)
log4j, [2015-11-01T19:32:37.960]  WARN: kafka.consumer.RangeAssignor: No broker partitions consumed by consumer thread logstash_logstash-indexer-1446386557328-9606786c-0 for topic logstash_logs2

#10

There were some other logstash instances running which I was not aware of. Thanks again @Joe_Lawson for pointing this out.


(system) #11