[bug] filebeat can't send data to kafka cluster when one kafka can't connect because of network

  • Filebeat version: 5.0.0-alpha4
  • Operating System: debian 8

I use filebeat get log from computer A, and then send to a kafka cluster group by two kafka run at two computers named B and C.
Today, I test the high avaliability of this system. below are what I do:

setting the firewall to cut off the network between A and B, and I hope the filebeat can send log to C, but not.these are the log of filebeat:

2016-07-22T15:43:34+08:00 WARN kafka message: client/metadata got error from broker while fetching metadata:%!(EXTRA *net.OpError=read tcp A's ip:37157-> B's ip:9093: i/o timeout
2016-07-22T15:43:34+08:00 WARN Closed connection to broker B's ip:9093

2016-07-22T15:43:34+08:00 WARN client/metadata fetching metadata for [t_log] from broker C's ip:9093

2016-07-22T15:43:34+08:00 WARN Connected to broker at C's ip:9093 (unregistered)

2016-07-22T15:43:34+08:00 WARN producer/broker/2 starting up

2016-07-22T15:43:34+08:00 WARN producer/broker/2 state change to [open] on t_log/5

2016-07-22T15:43:34+08:00 WARN producer/leader/t_log/5 selected broker 2

2016-07-22T15:43:34+08:00 WARN producer/leader/t_log/5 state change to [flushing-1]

2016-07-22T15:43:34+08:00 WARN producer/leader/t_log/5 state change to [normal]

the broker 2 is kafka on computer B.
the filebeat can't get the metadata from B because of the firewall on A, and then get metadata from C. But C can connect to B , and C think B is alive. So C send the metadata to A and suggested A also push data to B. And finally data can't send to kafka cluster.
And when you kill the kafka at B, filebeat can send data to C.

I think this is a bug, so how to fix it ?

have you got a more complete debug output from filebeat?

Are your partitions replicated? By default kafka client (sarama library) will try to push messages to all partitions in random, so messages are somewhat balanced between all partitions in a topic. When pushing to a partition, the message is send to the leader of that partition. If one leader goes down (and is detected by cluster), another leader for said partition will be chosen by kafka. By default kafka will re-elect the leader for a partition new every 10 minutes (I think).

Handling partitions in kafka client is up to dispatch in partitionProducer. If sending fails, it will be retried a few times by by same partitioner before returned to filebeat in order to reschedule onto another partitioning (due to randomness it might be rescheduled to same partition).

The current leader for a partition is handled by brokerProducer. If sending fails, the brokerProducer will be shutdown, giving partitionProducer the chance to find another leader. With all brokers still being able to see each other, no other leader will be selected though. That's why sarama keeps track of all partitions and writable partitions.

Selecting the partition is up to the topicProducer. Especially here we the lib checks if it shall distribute to all partitions or writable partitions only.

Thinking about this, there are 2 things one can try:

  • have replicas for partitions, so another leader will be eventually elected by system (might take very long, though)
  • make partitioner strategy in beats configurable. The default partitioner always returns true for tp.partitioner.RequiresConsistency(), forcing the client to schedule messages onto all partitions instead of writable only partitions.

I see this as an enhancement request instead of a bug. Normally kafka producer is responsible to balance messages about equally among all partitions. By pushing messages to 'available' brokers only, there is a chance of disbalance between brokers/partitions eventually leading to other problems like:

  • Increased disk usage on available broker, eventually out of disk space. Sizing doesn't match requirements anymore. In worst sizing scenario, brokers must be sized to contain all data.
  • Unbalanced processing of data in kafka (consumers not well balanced), in case one consumer-group handles one single partition only.

Still worth an enhancement request on github. If you're so kind to create the ticket, I will mark it for the pioneer program.

Thanks for your answer, and I

Thanks for your answer, and I have a question.
In your answer you said:

- make partitioner strategy in beats configurable. The default partitioner always returns true for tp.partitioner.RequiresConsistency(), forcing the client to schedule messages onto all partitions instead of writable only partitions.

You means I should modify the code of libbeat or I can configure it in filebeat.yml? And if can configure in filebeat.yml, how to configure it? I can't find the parameter.

Thanks for your reply again!

Let's start with an enhancement request on github describing the problem you ran into + asking for making partition selection more configurable (optionally ignoring unavailable brokers).

The code in filebeat needs to be modified in order to support different partitioning schemes (e.g. random, round-robin, hash, including all partitions or available ones only). Plus more code changes, to make these available from filebeat.yml. It's not just filebeat, but all beats.

This topic was automatically closed after 21 days. New replies are no longer allowed.


this PR will make the kafka partitioner configurable + adds option to ignore unreachable partitions, so output don't get stuck. This should help with the scenario described.

Can you please create a github issue? I'd like to mark the ticket for the pioneer program:

We’re very excited about these upcoming Elastic Stack Version 5.0 releases, but we want to make sure they’re perfect (or as perfect as software gets) before they ship more broadly. Everyone who reports a legitimate bug on the pre-release of the software will be recognized for the wider release, and receive a special Elastic gift package as our thank you. If you find something particularly tricky, you could also earn a free ticket to Elastic{ON}17, coming up next March 7 – 9 in San Francisco.