have you got a more complete debug output from filebeat?
Are your partitions replicated? By default kafka client (sarama library) will try to push messages to all partitions in random, so messages are somewhat balanced between all partitions in a topic. When pushing to a partition, the message is send to the leader of that partition. If one leader goes down (and is detected by cluster), another leader for said partition will be chosen by kafka. By default kafka will re-elect the leader for a partition new every 10 minutes (I think).
Handling partitions in kafka client is up to dispatch in partitionProducer. If sending fails, it will be retried a few times by by same partitioner before returned to filebeat in order to reschedule onto another partitioning (due to randomness it might be rescheduled to same partition).
The current leader for a partition is handled by brokerProducer. If sending fails, the brokerProducer will be shutdown, giving partitionProducer the chance to find another leader. With all brokers still being able to see each other, no other leader will be selected though. That's why sarama keeps track of all partitions and writable partitions.
Selecting the partition is up to the topicProducer. Especially here we the lib checks if it shall distribute to all partitions or writable partitions only.
Thinking about this, there are 2 things one can try:
- have replicas for partitions, so another leader will be eventually elected by system (might take very long, though)
- make partitioner strategy in beats configurable. The default partitioner always returns true for
tp.partitioner.RequiresConsistency()
, forcing the client to schedule messages onto all partitions instead of writable only partitions.
I see this as an enhancement request instead of a bug. Normally kafka producer is responsible to balance messages about equally among all partitions. By pushing messages to 'available' brokers only, there is a chance of disbalance between brokers/partitions eventually leading to other problems like:
- Increased disk usage on available broker, eventually out of disk space. Sizing doesn't match requirements anymore. In worst sizing scenario, brokers must be sized to contain all data.
- Unbalanced processing of data in kafka (consumers not well balanced), in case one consumer-group handles one single partition only.
Still worth an enhancement request on github. If you're so kind to create the ticket, I will mark it for the pioneer program.