Custom Output stops receiving batches after errors

Hi all,
I have a custom output (writing to Azure Event Hubs) and when I introduce network failures, causing the output to return errors, it stops receiving batches from Beats. Beats still appears to be running and receiving metric/file events however they never get sent to the output. Below is my Publish(...) code:

func (a *amqpClient) Publish(batch publisher.Batch) error {
  fmt.Println("output.Publish()")
  st := a.observer
  events := batch.Events()

  st.NewBatch(len(events))

  if len(events) == 0 {
    batch.ACK()
    return nil
  }

  for i := range events {
    err := a.publishEvent(&events[i])
    if err != nil {
      events = events[i:]

      // Return events to pipeline to be retried
      batch.RetryEvents(events)
      logp.Err("Failed to publish events caused by: %s", err.Error())

      // Record Stats
      st.Acked(i)
      st.Failed(len(events))
      return err
    }
  }

  // Ack that the batch has been sent
  batch.ACK()

  // Record stats
  st.Acked(len(events))
  return nil
}

And I created the output Group like this: return outputs.Success(1, -1, a)

If anyone has any ideas I'd really appreciate the help!
Justin

The signaling on the batch looks ok. Did you implement a Connect method? If so, you have a NetworkClient, which is driven by this loop: https://github.com/elastic/beats/blob/master/libbeat/publisher/pipeline/output.go#L61

On error, the network clients sending loop stops and the reconnect loop is started. Only after Connect returns true, the client instance will be used for sending new events. Every active client participates in the load-balancing (if multiple hosts are configured). If the connection is invalid/inactive, the client should notify the pipeline by returning errors on Publish and Connect. For the time an error is returned, the client instance is not used for load-balancing. That is, after an error you might consider some steps to figure if your connection is actually valid and maybe introduce a circuit-breaker.

In case your client has no Connect method, your client must not return an error on Publish. Clients without Connect are assumed to not be recoverable on error. They are driven by this loop: https://github.com/elastic/beats/blob/master/libbeat/publisher/pipeline/output.go#L44

If that's the case (as your libs handles all the magic for you), an error is a 'fatal' signal. If you lib can recover, just do signal batch.RetryEvents, but do not return an error. Please note, a client instance, never returning an error actively participates in load-balancing. You can use batch.Cancelled() to return the batch to the shared work-queue, without updating the events TTL. Cancelled should only be used to notify the pipeline of events that can not be published, due to normal behaviour (an error return is no normal behaviour). This is used by the Network Client, to lazily connect to an endpoint. Lazy connection is required, so to not block beats startup in case of an endpoint being slow or unresponsive.

A blocked client does not participate in load-balancing. e.g. on failure you might want to introduce some kind of back-off behaviour (in case it's not provided by the lib). Without back-off and active waiting, a stuck output might end up actively moving events between the work queue and the actual output, generating high CPU load.

So my output doesn't use the NetworkClient system to maintain the connection. The library that I'm using can renegotiate the connection on its own every time Send(...) is called. It sounds like what I should be doing though is calling batch.RetryEvents(failedEvents) and then return nil instead of returning the error? Is that correct?

Right, you should use return nil in this case.

Can Send ever block if renegotiation fails between send attempts? If not (no backoff), your beat might end up using 100% CPU if the output is failing over and over again.

There's a rudimentary retry policy in place that I think is good enough for now. I'm in contact with the library developer and we're putting together something for greater control of retries.

Thanks again for your help!

Btw. if your lib implements the retry policy, consider to disable the retry policy in output.Group by setting Retry to 0. If your lib is still prone to drop events, besides retry policy, set number of retries to a very large value and use Retry == -1 if user configures retry: -1.

Filebeat/winlogbeat set the 'guaranteed' flag when publishing events. This still enforces infinite retry.

With “infinite retry” approach events might get lost on filebeat/winlogbeat process restart (due to host restart)
Am I wrong about it?

The outputs must ACK an event. If an event is not ACKed, it is retried. Infinite retry means, the event is never dropped. Once an event has been ACKed, the 'next' status is forwarded to the filebeat registry, which updates the registry file. That is, on restart filebeat/winlogbeat continue from the last not-yet-ACKed event. When filebeat/winlogbeat is stopped correctly, a registry state update is serialized to disk. On power failure, a registry update might go missing. But the effect of this will be duplicates, not data loss.

If the log file is being removed between filebeat restarts, then logs will be lost for sure.

Steffen,
Thanks for clarification!

I think, it might be possible to avoid duplicates if one can find and set proper event ID.
For a log file it can be a hash (md5) of 1st line and line number the event was constructed from. ID should work even the file was renamed (rotated) already.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.