Throttling log output from Filebeat directly?

I'm working with a multi-tenant Kubernetes cluster and we're seeing issues where certain apps running as Docker containers are writing to log files at a really fast rate, such as spikes in app activity or some apps logging in TRACE mode (don't get me started).
I've noticed back pressure being applied and filebeat slows down because our Logstash workers can't keep up with the volume of logs being sent. This means that sometimes logs for our other clients that are logging at a "normal" rate aren't being sent to Logstash in a timely manner. These clients report a long latency time for their logs to show up in Elastic Search.
I'm trying to come up with a multi-tenancy solution where we'd be able to throttle log output for just the heavy loggers so that normal loggers aren't being affected. It would possibly involve dropping log output once a certain logging rate threshold was reached for a given docker container and replacing it with a warning log message every X number of events like "This app is logging too much right now so it's logs will be dropped".
I know this would need to be done in Filebeat directly and probably need to create our own custom version of Filebeat. What I want is some input on how to best achieve this and to get some insight from the development team on whether this issue is on their radar for future releases.

2 Likes

This throttling you asking for is unfortunately not yet support by filebeat. Unfortunately it's not that trivial to implement, depending when and on which condition one wants to throttle or even apply a global quota (e.g. set allowed rate to 0 for next 1h if app behaves really badly).

Some attempt to introduce event scheduling/throttling was made in this (a little outdated) PR: https://github.com/elastic/beats/pull/7082. Yet we've not been fully satisfied with this approach and flexibility this would provide.

Thanks I'll take a look at your PR.

What would it take to get something like this merged into master? Is there a discussion thread or a design document to look at? My team is very interested it getting this functionality merged into the Elastic master and we're interested in contributing if needed.

There is currently no discussions on this topic. I originally created the PR to start some discussion, but I wasn't happy with implementation myself because:

  • It's limited to libbeat only. How can we EASILY hook up the data collectors in metricbeat/filebeat, so to apply back-pressure even more early.
  • PR doesn't support byte counts, but only event counts. As events are not serialized yet it might help to integrate with actual inputs, so to reduce amount of byte being consumed in the first place (see first point).
  • How feasible will it to be add quota.
  • fully dynamic QoS rules is a must. E.g. weighted fair queuing
  • How to better handle global state for a common group?
  • Introduce support for hard and soft limits. Allow users to implement processors once soft limits is reached. E.g. have a hard rate limit of 10k events per second, but drop 'DEBUG' messages one 5k eps is surpassed for the last N seconds.

The problem is not about throttling only, but event scheduling in general: When to publish which event from which input. When/How to block inputs from consuming more data.

I'm happy to discuss the topic some more in the PR or in this thread, in order to refine some of the ideas.

Thanks for the reply Steffen. I'm beginning to realize how little I understand of the Beats architecture but I'll try to stay engaged and learn as quickly as I can.

I'd like to discuss the first bullet point in a little more detail to improve my understanding. Looking at your PR I see that the event scheduling is taking place in the libbeat pipeline. When back pressure is applied here (by blocking event publishing with a SchedulingPolicy) does that mean back pressure is affecting the Filebeat worker as a whole? Meaning does Filebeat try to limit sending events from all of it's Harvesters or just the one Harvester (eg collecting data from a bad actor) whose events are being blocked by a SchedulingPolicy?

A beat connects to the publisher pipeline and publishes via pipeline.Client. As of now each configured input/prospector has it's own Client instance (but I'd like to change it to client per harvester in the future). The client pushes the event to a shared queue once the event has been normalized and processed (fields, tags, configured processors).

Currently back pressure can emerge in the outputs and the shared queue. Once the queue is full it blocks. Once the queue blocks each Client will block when attempting to Publish an event. Due to the client blocking no more events will be collected by the harvester, as a harvester is mostly a simple loop executing: read next line -> publish line as event.

The publisher pipeline can slow down a data collector by blocking every now and then. This is what my PR is supposed to do. It adds a 'scheduler' to the Client. So to not generate work I put the scheduler policy before the event processing. By putting the scheduler before the event processing an user can not create a huge amount of work (high CPU usage) by dropping (almost) all events. I think here one can argue that we should still have a scheduling policy after the event processing as well, so to limit incoming work, as well as amount of outgoing events.

Schedulers are meant have 'global' state. This is why I introduced the notion of 'groups' in my PR. If a scheduling group is referenced, then all inputs (pipeline clients) are subject to the QoS rules in the shared group. In a multi-tenant setup a group can have multiple inputs. Therefore we need this shared state to keep a group/organisation to operate within limits. Using groups and per input/prospector scheduling policies we can build a tree of policies, each potentially limiting throughput (via blocking). This allows groups to further split up their quota asymmetrically between inputs and sub-groups. This is why dynamic QoS rules (e.g. weighted fair queuing) would be a better fit in comparison to pure rate limiting.

Each beat needs to acquire an event, which is send forwarded to libbeat. For example filebeat (especially with multiline enabled) itself has work to do before publishing the event. Namely collecting the lines and creating an event. Only once we have an event can Filebeat be blocked by libbeat. The blocking model in libbeat fails if Filebeat harvesters drop a many events early (user can create huge IO/CPU load) or if multiline is used. Plus the libbeat blocking model does not take event sizes into account. But if we had a scheduling policy in the harvester/reader as well we could block filebeat from even reading an event based on Ops-count or bytes consumed. Basically by blocking the file reader for short times we could rate-limit the harvesters as well.

So in your PR comment for your example RateLimit scheduling policy you wrote

Input 3 enforces a limit of 500eps per harvester.

Is this actually the case if we have one beat.Client instance per input/prospector instead of one beat.Client per harvester?

Is this actually the case if we have one beat.Client instance per input/prospector instead of one beat.Client per harvester?

Nice catch. I'd say my comment is somewhat wrong. The way filebeat currently works, the limit is not per harvester, but per input.

One input configuration can create an unknown number of harvesters. As users can not configure the number of harvesters, I'd say it would make sense for filebeat inputs to always apply the limit/policy to the complete input, such that the 500eps would be a shared bandwidth limitation for all harvester created per input. Enforcing this semantics also reduces the per Beat implementation details/abstractions leaking into configs.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.