Hi all,
I have a custom output (writing to Azure Event Hubs) and when I introduce network failures, causing the output to return errors, it stops receiving batches from Beats. Beats still appears to be running and receiving metric/file events however they never get sent to the output. Below is my Publish(...) code:
func (a *amqpClient) Publish(batch publisher.Batch) error {
fmt.Println("output.Publish()")
st := a.observer
events := batch.Events()
st.NewBatch(len(events))
if len(events) == 0 {
batch.ACK()
return nil
}
for i := range events {
err := a.publishEvent(&events[i])
if err != nil {
events = events[i:]
// Return events to pipeline to be retried
batch.RetryEvents(events)
logp.Err("Failed to publish events caused by: %s", err.Error())
// Record Stats
st.Acked(i)
st.Failed(len(events))
return err
}
}
// Ack that the batch has been sent
batch.ACK()
// Record stats
st.Acked(len(events))
return nil
}
And I created the output Group like this: return outputs.Success(1, -1, a)
If anyone has any ideas I'd really appreciate the help!
Justin
On error, the network clients sending loop stops and the reconnect loop is started. Only after Connect returns true, the client instance will be used for sending new events. Every active client participates in the load-balancing (if multiple hosts are configured). If the connection is invalid/inactive, the client should notify the pipeline by returning errors on Publish and Connect. For the time an error is returned, the client instance is not used for load-balancing. That is, after an error you might consider some steps to figure if your connection is actually valid and maybe introduce a circuit-breaker.
If that's the case (as your libs handles all the magic for you), an error is a 'fatal' signal. If you lib can recover, just do signal batch.RetryEvents, but do not return an error. Please note, a client instance, never returning an error actively participates in load-balancing. You can use batch.Cancelled() to return the batch to the shared work-queue, without updating the events TTL. Cancelled should only be used to notify the pipeline of events that can not be published, due to normal behaviour (an error return is no normal behaviour). This is used by the Network Client, to lazily connect to an endpoint. Lazy connection is required, so to not block beats startup in case of an endpoint being slow or unresponsive.
A blocked client does not participate in load-balancing. e.g. on failure you might want to introduce some kind of back-off behaviour (in case it's not provided by the lib). Without back-off and active waiting, a stuck output might end up actively moving events between the work queue and the actual output, generating high CPU load.
So my output doesn't use the NetworkClient system to maintain the connection. The library that I'm using can renegotiate the connection on its own every time Send(...) is called. It sounds like what I should be doing though is calling batch.RetryEvents(failedEvents) and then return nil instead of returning the error? Is that correct?
Can Send ever block if renegotiation fails between send attempts? If not (no backoff), your beat might end up using 100% CPU if the output is failing over and over again.
There's a rudimentary retry policy in place that I think is good enough for now. I'm in contact with the library developer and we're putting together something for greater control of retries.
Btw. if your lib implements the retry policy, consider to disable the retry policy in output.Group by setting Retry to 0. If your lib is still prone to drop events, besides retry policy, set number of retries to a very large value and use Retry == -1 if user configures retry: -1.
Filebeat/winlogbeat set the 'guaranteed' flag when publishing events. This still enforces infinite retry.
The outputs must ACK an event. If an event is not ACKed, it is retried. Infinite retry means, the event is never dropped. Once an event has been ACKed, the 'next' status is forwarded to the filebeat registry, which updates the registry file. That is, on restart filebeat/winlogbeat continue from the last not-yet-ACKed event. When filebeat/winlogbeat is stopped correctly, a registry state update is serialized to disk. On power failure, a registry update might go missing. But the effect of this will be duplicates, not data loss.
If the log file is being removed between filebeat restarts, then logs will be lost for sure.
I think, it might be possible to avoid duplicates if one can find and set proper event ID.
For a log file it can be a hash (md5) of 1st line and line number the event was constructed from. ID should work even the file was renamed (rotated) already.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.