Beat's newfound love for kafka should (also) be synchronous

I didn't raise a github issue but I was reviewing beats newly implemented kafka output and noticed its use of Sarama's async producer.

This is great but I have the strong suspicion it will cause message loss when agents lose contact with the broker (even when acks are set to wait for all brokers).

The reason for that is Sarama async is non blocking and will release the clients (beat in this case) to keep doing their job (https://www.reddit.com/r/golang/comments/2wzc0r/apache_kafka_client_library_for_go/cp7onub) while pumping failed messages into an error channel.

Although the behavior is configurable, the async producer is not as generous as one would expect with the config allowing for the following definitions:

Retry struct {
    // The total number of times to retry sending a message (default 3).
    // Similar to the `message.send.max.retries` setting of the JVM producer.
    Max int
    // How long to wait for the cluster to settle between retries
    // (default 100ms). Similar to the `retry.backoff.ms` setting of the
    // JVM producer.
    Backoff time.Duration
}

The main impact of this is to hinder the use of kafka as a delivery mechanism for remote/internet connected producers that are just too far away from a DC to guarantee a stable connection to the brokers. If that connection drop, Sarama starts spitting errors back into the channel and unless you requeue them messages can go lost big time.

IMHO it would be a good approach to make beat's kafka output configurable so that users can chose between Sarama's async and sync producer.

Any thoughts?

It's true, we're using the AsyncProducer, but libbeat is not relying sarama AsyncProducer for handling failures. It's only used for pipelining (multiple batch requests on network before first ACK) for better throughput.

The error pipeline is used and all failed send attempts are being requeud until max_retries (configurable in beats config file). Error handling is still dealt with in libbeat. Implication for filebeat is, a batch is resend until ACKed. By default filebeat itself is synchronous (waiting for ACK from kafka), in a similar fashion to the SyncPublisher (which is basically a wrapper for AsyncPublisher), as the publisher can decide in PublishEvent(s) to be synchronous or asynchronous via options. This would only become a concern if library responds with success for failed send attempt (which I would treat as a bug).

Future plan is to ditch AsyncProducer, as samara duplicates too much functionality from libbeat (in a minor different way) and go with more low-level Broker in sarama library, but this requires some more enhancements to internals in libbeat first to deal with broker metadata updates.

Backoff on errors is also handled by libbeat, unfortunately not properly initialized right now and needs to be fixed.

Thank you for the response. I saw the re batching call after you update the counters but was curious on how is libbeat dealing with queue batching?

The classic scenario I have in mind is: Tail a massive file and pipe it into kafka. How to guarantee the tail seek/position isn't updated prior to the successful delivery of a message? I would imagine in the bare minimum beats would tell the beat producer (e.g. filebeat) to stop sending data?

NiFi is doing it by splitting both the task in two. It's tail processor just read the files and deliver them to the framework, updating the persistence file once it is acknowledged by the framework, the reverse applies when data is leaving NiFi into kafka. The data flow is only ACKEed once Kafka has ACKed,but their framework seems to offer "hop-by-hop" guarantees.

Do you persist those error batches in disk or are they kept in memory? My understanding was that beats do not offer delivery guarantees for data held within their pipelines?

check out publish.go in filebeat, it introduces a syncLogPublisher and asyncLogPublisher. These publishers wrap libbeat publisher functionality, forwarding ACKed send requests to the registar updating the internal 'database' (basically a json file) about known file-states. On restart filebeat will read the registrar database and continue processing files from last offset being ACKed by publisher pipeline.

Right now the publisher pipeline ACKing a publish request relies on the sink (kafka/elasticsearch/logstash) ACKing the send attempt.

PublishEvent/PublishEvents have support for some options: (Guaranteed, Sync, Signal) to get some feedback/guarantees on send attempts. See usage in filebeat here for syncLogPublisher and here for asyncLogPublisher.

The catch with async publisher + potential load-balancing for filebeat is, we can not guarantee any ordering, but filebeat needs to update the registrar database in order. Thusly the asyncLogPublisher holds a sorted list (by time) of PublishEvents marking a batch being send via ACK-callback. Due to passing the Guaranteed option, publish will only send a fail response if beat/publisher is being shutdown (short, publisher pipeline retries until ACK). Every now and then the asyncLogPublisher collects ACKed publisher requests in order in order to inform the registrar of some file-state update.

I'm considering moving support for ordered ACK/Fail-signaling into libbeat publisher, so functionality can be potentially reused via new option (e.g. OrderedSignal).

For other beats like topbeat/packetbeat there are no guarantees, after max_retries events will be dropped (default: 3). By setting max_retries to -1, you enabled the Guaranteed send mode, but due to the nature of packetbeat/topbeat you will basically loose events on the 'other end'. E.g. topbeat can not collect new events if queues are saturated and packetbeat (internally) just drops events once queues are saturated. Different beats do have different requirements and we have to ensure libbeat can provide support for these requirements,

2 Likes