Filebeats can overwhelm target (kafka,logstash,esearch) on recovery

Our config is:
filebeats -> Kafka

Approx 1000 aws instances running mostly rails logging approx 10 million lines per day into a 3 node Kafka - 3 node Zookeeper cluster behind an ELB.

A few days ago, we lost Kafka in the middle of the night due to a space issue. Beats could not deliver logs, so it kept the last successful line indexed in the registry on each node, as it should.

In the morning Kafka was rebuilt with more space and a more conservative retention period. However by this time a significant backlog of undelivered logs had accumulated on the 1000 nodes.

When the Kafka cluster was back online we saw the traffic to the kafka cluster go from a normal 7 million bytes per minute per kafka node to 700 million bytes per minute, where it remained flatlined (maxed out) for several hours. During this time, Kibana only gained about 1 hour of logs, about 40 million lines, most of which were duplicates.

I reconfigured beats to include tail_files and removed the registry during the beats restart and all returned to normal (minus the nights logs).

It appears that filebeats may be too aggressive in trying to recover from the out of sync condition.

1000 nodes trying to deliver a backlog of logs was overwhelming the endpoint, which may not necessarily be Kafka, it may be the same with redis, logstash or elasticsearch.

Coincidentally, another anomaly occurs where duplicates are opened of the log files which may exacerbate the issue by overwriting the registry with an old pointer (written by one beats harvester) with the failed results of another. This could cause repeated log lines being delivered to the endpoint, almost endlessly (although it seems to stop at 2.4 million identical lines)

Filebeats becomes a highly efficient gattling gun, hammering the endpoint with log lines, some (or most?) duplicates.

Given that Kafka does not return info in the Ack about the state of the cluster, it may be necessary to give users a parameter to throttle beats so that the runaway recovery can be managed. Perhaps a maximum log lines per minute. Even if beats hammers for 10 seconds to deliver the maximum then stops until the next minute would be better than the relentless delivery of log lines.

Eventually, it may be worthwhile to investigate some intelligent auto throttling based on the response time of the endpoint.

About the duplicate I think you created another discuss topic. So I won't comment on it here.

Filebeat can already deal with back-pressure, just slowing down reading files if output can not process events fast enough. Output from filebeat point of view is just the sockets used to send events and slow down can happen in network, any intermediate device or destination limiting input rate.

What you're basically asking for is some kind of IO throttling to limit total throughput (by events/bytes/ ... whatever). But then, what's the best throttling policy? How shall throttling change/adapt over the course of the day? Any other services requiring higher priorities, besides being silent most the time or requiring more available bandwidth during peak times (QoS)? How to adapt throttling with N services pushing (how to figure out in filebeat, N additional filebeats are pushing?)?

Besides very simple rate limiting, proper adaptive throttling can very easily become a global scheduling problem with every user having potentially different requirements. I think policies could/should be implemented by network and/or kafka endpoints having a better idea which policies shall be supported, given filebeat being mostly on edge/outside of core network. For example see this filebeat FAQ entry. For some more examples/tutorial/docs how to control traffic on linux you can google for linux traffic control or linux QoS? I've no experience with this services, but considering you using AWS, have you had a look into Amazon Route 53?

Thanks for your observations.

The duplicate issue is already open in another Topic.

I am not sure the back-pressure feature is working properly. Here is a screenshot of the Kafka cluster trying to recover.

That is an interesting way to handle backpressure! Kill the servers!

Can you give me a pointer to the source code where filebeats is managing the back-pressure?

I would like to have a look!

Back-pressure occurs on network level only and is subject to TCP congestion control. Filebeat is using on socket per kafka broker. That's it. There is no additional algorithm trying to detect something-something. Back-pressure occurs if endpoint can not process data as fast as beats sends events or due to limits in network. If endpoint can keep up, beats will try to push as fast as possible.

In general creating/adding/handling back-pressure relies on protocol features. Here we've got TCP slow-start and windowing + kafka ACKs + filebeat using sync mode (wait for ACK before sending another batch of events).

That is, in order to create additional artificial 'pressure' some rate-limiting or QoS policies should be applied on network level.

Without having seen your config I can walk you through the default case:

  1. filebeat collects events created by harvesters into a spooler. The spooler flushes its buffer either if buffer is full or some internal timeout occured. In your case the buffer will be full almost all the time due to kafka ACKing events all the time.

  2. once the spooler flushes it's buffer by default the syncLogPublisher will take over asking libbeat to publish the buffer with Sync and Guaranteed flags set. Guaranteed ensures the output retries on network failures, until all events have been ACKed. The Sync flag ensures PublishEvents blocks until the buffer is ACKed. That while the spooler might buffer new events, it will be blocked on internal channels and publisher until kafka did ACK the current batch.

  3. the libbeat sync publisher pipeline directly forwards the batch to all outputs using a syncing channel. The sync channel is used to wait for all outputs to ACK the events to be published (on shutdown requests will be cancelled). Only after having received the ACK from the output, the sync-publisher pipeline will return.

  4. fast forward to the kafka output. The kafka output is based on the sarama library it's AsyncProducer. The output ref-counts all events in the batch. The success and failure callbacks will decrease the reference counter. Once reference count becomes 0, the callback passed to AsyncPublishEvents will be executed.

  5. The callback will have to retry sending the batch if publishing failed. Otherwise will signal the sync-publisher pipeline the batch has been ACKed.

  6. In sarama library the events will be partitioned randomly (partitioning strategy will become configurable in beta1) and forward the events to the partitions leader its handling worker. The broker output worker will buffer events and eventually flush the buffer finally sending all events to kafka. Note: while sarama is mostly working asynchronously, no new events will be send to kafka, until all events in active batch have been ACked.

...

...

Due to the load-balancing nature of kafka, throughput is somewhat increased. With beta1 support for configuring the partitioner strategy, one can configure the output to use the same partitioner for the complete batch, still distributing the events between all partitioners, but only one partitioner per batch at a time (artificially reducing current throughput to available bandwidth of selected partition).

But as long as kafka/logstash/elasticsearch keeps responding in a timely fashion, there is no way to detect back-pressure. Active traffic shaping is a good way to limit total available bandwidth and thusly creating artificial back-pressure in filebeat.

This topic was automatically closed after 21 days. New replies are no longer allowed.