Filebeat stops shipping data when Kafka broker is unavailable

Hello. Noticed an issue with my Filebeat & Kafka environment. Can anyone confirm behaviour described below? If so, I'll report it on Github to be fixed.

Reproduction steps:

  • Filebeat is configured to ship data to Kafka brokers
  • there are three Kafka brokers defined in Filebeat configuration
  • Kafka brokers are synchronizing data between themselves - so it only needs to reach one of the brokers, no need to ship to all of them
  • all three brokers are active
  • one of the brokers in unrechable (network) from Filebeat location

Expected result:

  • Filebeat is able to properly ship data to any of the remaining brokers

Actual result:

  • Filebeat has some problems with handling the issue and, after some time, stops to ship data completely
  • after being restarted Filebeat properly ships all delayed logs (despite one of the brokers still being inactive)
  • after some time passes, the problem reoccurs and logs stop being shipped

At first I thought it's just the (strange) way Filebeat was designed and I need to manually configure it not to try to ship to unavailable brokers. See https://github.com/elastic/beats/issues/2626 or [bug] filebeat can't send data to kafka cluster when one kafka can't connect because of network for reference. Sadly, after enabling "reachable_only: true" parameter the situation got worse. In my case it took Filebeat around two hours to stop shipping logs. With the new parameters the time shrunk to 30 minutes.

Enabled DEBUG logs for Filebeat. Didn't see anything that I would find useful in solving the issue on my end.

So to summarize - looks like some sort of memory leak to me? The process seems to have some internal problem that slowly kills it.

Environment:

  • Debian 9.5
  • Filebeat 6.3.2
  • Kafka 2.11-1.1.0

Filebeat configuration:

filebeat.inputs:

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /path1/*.log
    - /path2/*.log
  multiline.pattern: '^[0-9]{4}\-'
  multiline.negate: true
  multiline.match: after
  fields_under_root: true
  fields:
    service: xxx
    profile: yyy

processors:
 - drop_fields:
     fields: ["host"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml

  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 3

setup.kibana:

output.kafka:
  hosts: ["dns1:port", "dns2:port", "dns3:port"]

  topic: "zzz"
  ssl.certificate_authorities: ["/etc/ssl/certs/somecert.pem"]
  ssl.enabled: "true"
  ssl.verification_mode: "full"
  ssl.supported_protocols: "TLSv1.2"
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

Given your Reproduction steps and actual result, I'd say filebeat is behaving correctly. It is not having problems to send data, but it detects one of the brokers is not reachable and therefore blocks until your cluster is stable again.

With kafka, loats of logic is pushed into the clients. Especially the producers. In kafka one creates a topic with N partitions. These partitions control the amount of load balancing you have in the system. Having N partitions, the data shall be distributed about equally among these partitions. Partitions are stored on disk (and partitions again are split into segment files). The sizing and retention policies must take into account the amount of data to be stored in a partition. The retention policy is by time, not by size. Common issue people run into: disk is full.
Partitions are not only about scaling disk usage. Consumers read from partitions. A consumer in a consumer group can read from at least 1 partition. But one partition can only be read by one consumer. That is, the number of partitions also controls the amount of scaling you can have downstream. Proper load-balancing in the consumers depends on the producers properly load-balancing events among the brokers.

Producers/consumers do not operate with single brokers, but against the cluster. This means, when connecting a producer/consumer to a cluster, the clients start with bootstrapping all broker connection. While bootstrapping, one broker in the list of configured brokers is used to query the cluster meta-data from. The meta-data do contain the cluster state and all brokers + advertised hostname. A client creates connections the the cluster brokers based on the cluster metadata, not based on the initial configuration.

  • Filebeat is configured to ship data to Kafka brokers
  • there are three Kafka brokers defined in Filebeat configuration

The configuration is only for the connection bootstrap. The kafka cluster configures where to send events to.

  • Kafka brokers are synchronizing data between themselves - so it only needs to reach one of the brokers, no need to ship to all of them

State in kafka is very localized. Beats (or any other client) do not publish events to brokers. They publish events to partitions. Brokers act as 'leader' and 'followers' on partitions. A client must send data to the leader-brokers in order to publish the event to the partition. A leader forwards events to the replicas (follower-brokers) only. A replicate can be in sync or out of sync. If the leader-broker goes down, the partition can be served by another broker if and only if there is another in-sync replicate broker holding the same partition. If not, then the partition is absent. Having replicas is not just about data-safety, it's also a way of falling over in case of single broker outages.

  • one of the brokers in unrechable (network) from Filebeat location

This is a major problem with kafka. Do not operate kafka in a different data-center. When using kafka, outages are expected to be spurious. When a broker becomes unreachable, then it means a number of partitions become unreachable.

Having unreachable partitions, we can follow 2 strategies:

  1. block until unreachable partitions are available again
  2. distribute events only among the reachable partitions

Each strategy comes with advantages and disadvantages. In case of strategy 1 the disadvantage is: long running network failures will block clients. The disadvantage of 2 is a very uneven load-balancing. At worst all events might end up in one partition only. This must be taken into account when sizing and deciding on retention periods. E.g. retention by size might lead to data loss at worst. Retention by time might lead to disk running out of space -> another set of partitions becomes unavailable...

Using outputs.kafka.partition.round_robin.reachable_only you can choose strategy 1 by setting it to true (default setting) and choose strategy2 by setting it to false.

Having network outages/hiccups should be the exception, not the norm. The kafka cluster itself might still be correct. That is, clients have to try to reconnect all the time. The producer operates asynchronously to beats and retries scheduled events. If it continuous failing, events might be returned for retry. But the broker is still the 'official' broker for the partition (cluster meta-data say the broker is available). So it has to retry. Grep logs for regex 'producer.*sate change to.*retry'. In this state the partition is subject to event retries. Yet, retries are failing.
If you logs contain a string saying there is currently no leader for this partition, then there is no leader for this partition in the cluster. In this case events will be tried to reroute to other partitions.

In beats to the memory queue defines an upper bound on events to be published. Outputs must ACK events, so to free up slots in the ring-buffer. If one broker holds on events retrying, the queue runs full and blocks -> other outputs will not receive events.

  • after being restarted Filebeat properly ships all delayed logs (despite one of the brokers still being inactive)
  • after some time passes, the problem reoccurs and logs stop being shipped

Filebeat doesn't know yet about the network problems and schedules events among partitions. Each partition/broker publishes events independently. The cluster state returned by kafka tells the producer that the cluster is operating correctly, so producers have to try. Those partitions that are reachable can be served. Those that are not reachable will fail after some timeout. Beats stop sending events once the output buffer is full and one partition is detected to be unreachable.

So to summarize - looks like some sort of memory leak to me? The process seems to have some internal problem that slowly kills it.

Given your report I don't see any evidence for a memory leak. The process is not slowly killed, but it has to retry. Even with reachable_only: false, a broker/partition might be subject to retry. For retrying, events must be send. In case of retrying, all progress might be slowed down.

1 Like

@steffens, first of all - thank you for a thorough reply. I do appreciate you taking your time to reply in such detail.

Your point of view on the subject is quite different from mine. Let me wrap my head around what you wrote, think it through, test a few things and get back to you.

@steffens I did some testing. Here are the results:

Scenario 1 - all of the brokers are up and running, but one of them is network-unreachable for Filebeat. Parameter "round_robin.reachable_only" is set to TRUE.
Result - Filebeat dies after queue runs full. In my case it's around two hours.

Scenario 2 - all of the brokers are up and running, but one of them is network-unreachable for Filebeat. Parameter "round_robin.reachable_only" is set to FALSE.
Result - no change. Filebeat dies after queue runs full. In my case it's around two hours.

Scenario 2 - all of the broker-machines are network-reachable for Filebeat, but one of the brokers goes down. Parameter "round_robin.reachable_only" is set to TRUE.
Result - everything seems to work, data is shipped to other brokers. Tested twice - one test was about 4 hours long, the other was 5 hours long.

Seems to me like the scenario where one of the brokers is reported as OK, but Filebeat cannot reach it, was simply not taken under consideration and cannot be handled by the code. While I understand the approach you explained in previous post, maybe it would be a good idea to think about handling it?

Anyway, back to my logs:
They do contain string "producer.*state change to.*retry" in scenarios 1 and 2. With scenario 3 - they do not.
Didn't find any "no leader" string no matter the scenario.

Am I correct in assuming that in scenarios 1 and 2 Filebeat does not consider its data to be ACK by Kafka?

With Filebeat dying you mean it stops sending?

Am I correct in assuming that in scenarios 1 and 2 Filebeat does not consider its data to be ACK by Kafka?

Right. If filebeat does not get it's events ACKed, the queues will run full and block.

Scenario 2 and 3 are do match my expectations.

Scenario 1 kind of surprises me, but then it does not :slight_smile:
But yeah, I would prefer it to act more similar to Scenario 3.

In Scenario 3 a partition becomes marked as unavailable within the cluster. In Scenario 1, the partition is not marked as unavailable, as it's live within the cluster. Instead an I/O error occurred, but the IO error might be spurious (should be). On error the library retries. Part of the problem is: the partitioner assigns events to one cluster-available partition. As the partition is live within the cluster, the partition can assign events to it. Once an event is assigned to a live-partition, it will be retried on this partition only. No re-routing on error. Only if the partition becomes unavailable in the cluster the client errors out, forcing events to be re-partitioned (as has been shown by Scenario 3).

Filebeat does not implement the kafka client itself, but uses a third party kafka client. That is we only have limited control on marking partitions as active or error. But we can try to break the 'stable' partitioning on error. Do you have a go build environment so you can build filebeat yourself? As you noticed, testing the scenarios requires quite some effort and time. I hope you can help here.

Note: This is not a full fix, but some code change to check Scenario 1 will act more similar to Scenario 3 (more similar means, still very heavy overhead due to retries as partition can not be marked as unavailable). In libbeat/output/kafka/client.go we can modify the error handler to undo the event scheduling (set msg.partiton = -1) by changing msgRef.fail(line 227) to:

func (r *msgRef) fail(msg *message, err error) {
	switch err {
	case sarama.ErrInvalidMessage:
		logp.Err("Kafka (topic=%v): dropping invalid message", msg.topic)

	case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
		logp.Err("Kafka (topic=%v): dropping too large message of size %v.",
			msg.topic,
			len(msg.key)+len(msg.value))

	default:
                msg.partition = -1 // undo partitioning, so event can be send to another node
		r.failed = append(r.failed, msg.data)
		r.err = err
	}
	r.dec()
}

This change is somewhat heavy if a broker is unreachable for longer periods. With this change we try to enforce progress by rescheduling events on another partition. But still, the partition is alive within the cluster, so events will be scheduled on said partitions and the client needs to retry.

The queue in filebeat and the client have multiple asynchronous steps and some elements like batch sizes are dynamic, subject to number of events, encoded event sizes and flush timeouts. This makes it hard to predict behavior, as interaction of states can become quite chaotic. But assuming queues in filebeat are mostly filled, we can try some slight optimisations by always scheduling N subsequent events on the same partition. That way, if there is an IO error, a batch of events get rescheduled to another partition (optimal values are subject to tuning):

output.kafka:
  ...
  round_robin.reachable_only: false
  round_robin.group_events: 512 # number of subsequent events send to same partition

  bulk_max_size: 512  # number of events drawn from filebeat queue at once

Assuming we always get full batches from the queue, this config guarantees that a full batch is scheduled to be published on the same partition. If publishing fails, the complete batch will be rescheduled onto another partition. This way only one batch of events gets blocked every now and then.

With Filebeat dying you mean it stops sending?

Yes I do.

Do you have a go build environment so you can build filebeat yourself? As you noticed, testing the scenarios requires quite some effort and time. I hope you can help here.

I don't have one. From technical point of view nothing stops me from preparing one, but... If we'd need to go as deep as to modify sources and build Filebeat manually in order to test new options, I am afraid time will be a limitation here. I'd love to further work with you on this subject, but there are also other duties I need to take care of, so I believe we will need to stop now.

Conclusions so far:

  • scenario where Kafka broker is OK, but is not reachable for Filebeat, results in queue running full
  • in order to change this behaviour, code modification would be necessary

Thank for for your help so far. If I have time to get back to our tests, will surely let you know.

Yep.

If I have time to get back to our tests, will surely let you know.

Thank you.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.