How to Handle Publication Failures

I'm writing a Beat to pull events from a message queue and publish them via libbeat. If an event fails to publish, I'd like to send it to a dead letter queue. However, from looking at the outputs/elasticsearch/client.go#PublishEvent implementation, I get the impression that publication failure isn't always communicated back to the beat.

Can anyone suggest the right approach for detecting and handling failures? My ultimate goal is to ensure that event data is never lost.

Signaling success/failure is possible with libbeat. Currently I'm working on a PR to add support for asynchronous signaling (which is already possible in libbeat, but not exposed in API). Will give you some details later this day, when my PR is ready.

In meantime, how do you plan to handle failed sends? There has been some discussion about queueing to disk (which effectively will become part of libbeat). e.g. this github issue (there have been others, just can not find them right now).

Async signaling would be great. My plan is to just buffer in-flight messages (the option to persist them to disk would be a bonus). Ideally, the failure reporting would provide a way to correlate events with bulk sends (maybe publishEvent(s) returns a batchID?). Then I'd have the option of either nacking the buffered messages or sending them to a dead letter queue.

My PR finally made it into master: https://github.com/elastic/beats/pull/684

There are 3 publisher options one can use:

Sync: If sync is used, PublishEvent and PublishEvents will block until we've got an ACK from elasticsearch or logstash.

Disadvantage:

  • is longer wait times
  • kinda disables load balancing if only on beat is running (still round-robin, but only one sink at once).
  • recommended to use with some batches of events and PublishEvents only

Advantages:

  • PublishEvents returns bool indicating success or failure
  • (Dramatically) reduces memory usage as it limits number of event in flight

Guaranteed: If option is set, libbeat will retry publishing the events until success, no messages are dropped. Publish will only fail on shutdown, when pipes are cleared internally (to not halt shutdown due to unresponsive logstash/elasticsearch instance). This options is fully async. If queues in libbeat fill up, PublishEvent will block. If load-balancing is configured, there is no guarantee on indexing order in elasticsearch or logstash (Sync is required to fully guarantee order).

Signal: The option awaits any type implementing the outputs.Signaler interface. The Completed/Failed callbacks will be directly called from within the output plugin. To not block the publisher queue for too long, make sure these callbacks don't have much work to do.

I'm using Guaranteed + Signal in some experimental filebeat fork. See publisher in filebeat guaranteeing success/failure of async batch publish events are reported in sequence to the registrar.

If you don't care about order when handling errors, there is no need for having a sorted list + flags. Instead you can have a Failed callback method forwarding failed batches (or just a batchID) on another channel to some worker handling fails.

I'm looking for common patterns to evolve for different use-cases, to provide helpers in libbeat for getting rid of some of the low-level handling. But we're not there yet.

In case your project is open-source I'm very interested to see the final result + learn about possible improvements to libbeat publisher.

Thanks for the update Steffen. That PR is exactly the sort of thing I was looking for. The longer wait time won't be an issue for us, since we're only expecting an event every few seconds.

I'm hoping to open source our beat once it's further along, but I'll post some feedback either way.

My only remaining concern is how the outputs themselves classify failures. For example, it appears that the elasticsearch client ignores most 4XX responses. But I need to do some more investigation to confirm. I'll start a new thread/issue if it turns out to be the case.

Thanks again for your time and work on this.

When using the bulk-API, all encodable events are retried upon direct failure from elasticsearch node. See Line 127. The client returns all events not being published.

If the actual http request was successful, we check per item success/failure for retrying failed indexing attempts based on HTTP return code. See loop at line 220.

When using options Guaranteed and Signal, or 'SyncwithPublishEvents` the bulk API will be used.

publishing a single event using PublishEvent with Sync, the index API is used. See error handling Line 339.

There are 3 kinds of results when publishing events, but we currently handle just 2.

  1. success
  2. recoverable error => retry
  3. non-recoverable error (e.g. can not encode to json or incompatible mapping in elasticsearch) => write error message to log + discard event

Unfortunately we don't have a method to signal non-recoverable errors in libbeat yet. That's why upon discarding we signal success (do not retry) in order to keep the pipeline operating.

Thanks, that certainly saved me some leg work. And yeah, it's precisely the non-recoverable errors that concerned me. I want to avoid dropping messages on the floor if and when users happen to misconfigure something.

Are there plans to add signaling for non-recoverable errors? Or are you open to receiving PRs in that area? (I ask this having no concept of how hard it would be).

There are currently no plans on handling non-recoverable errors, although I understand it being an important feature in data processing pipelines and hope to have this implemented in future beats releases.

Beats is all open source and we're very happy about PRs. Feel free to open a discussion on github first about possible implementations.

Sounds good. I'll journal the messages to disk for now and look into proposing an implementation on github.com.

Cheers.