Thank you for the test code.
I modified your test code a little:
type test struct {
done chan struct{}
config config.Config
client beat.Client
printACK atomic.Bool
}
...
func (bt *test) ackLastEvent(data interface{}) {
if data != nil {
if lastAckID, ok := data.(int); ok {
if bt.printACK.Load() || lastAckID == lastPublishedID {
logX.Infof("==> Last Published event IDs is ACKed: %d", lastAckID)
}
}
}
if !isRegistryOpen {
logX.Warn("==> Cannot update registry...")
}
}
...
// replace `defer bt.client.Close()` with
defer func() {
bt.printACK.Store(true)
logX.Info("closing beat.Client")
bt.client.Close()
logX.Info("closed beat.Client")
}()
This change prints some more status before and after closing (including timestamps, thanks to the logger). Before calling Close
, the printACK
flag is set to true, enforcing the ack handler to print all acks that happen while Close
blocks.
Due to having timeouts on close and never blocking forever, there is always the chance of race conditions. Libbeat assumes the worker state being invalided after beat.Close
. That is, any local ACKs after bt.Close
returns will be filtered out (local callback will not be run anymore).
All in all the tests did run fine for me. Well, I was publishing to /dev/null, so the output was never really creating back-pressure. Having back-pressure from the output can affect the timing as well, potentially missing some ACKs due to the client being already closed.
I did run the beat with ./testbeat -e -v -c test.yml > /dev/null
(logs are printed to stderr ).
For this configuration:
test:
EventCount: 1000000
WaitClose: 3s
output.console.pretty: true
queue.mem:
flush.timeout: 1s
I did always get this output:
|2018-05-10T14:07:13.391+0200|INFO|[test]|beater/test.go:74|==> Events published: 100999999|
|---|---|---|---|---|
|2018-05-10T14:07:13.391+0200|INFO|[test]|beater/test.go:99|closing beat.Client|
|2018-05-10T14:07:13.408+0200|INFO|[test]|beater/test.go:46|==> Last Published event IDs is ACKed: 100999423|
|2018-05-10T14:07:14.401+0200|INFO|[test]|beater/test.go:46|==> Last Published event IDs is ACKed: 100999999|
|2018-05-10T14:07:14.401+0200|INFO|[test]|beater/test.go:101|closed beat.Client|
What's interesting about this output is the 'delay'. Checking the timestamps, there is a noticeable delay between the last 2 ACKs of about 1 second. The memory queue buffers up to 2048 events, before flushing the buffer and forwarding the batch to the outputs. With the given configuration, the last batch size is 500 something events. That is, we have to wait for the timeout.
Btw. one can set timeout: 0s
, in order to stream much smaller batches directly to the outputs.
Changing my queue setting in the config file to this:
queue.mem:
flush.timeout: 10s
Then ACKs have been indeed been missing:
|2018-05-10T14:06:45.511+0200|INFO|[test]|beater/test.go:99|closing beat.Client|
|---|---|---|---|---|
|2018-05-10T14:06:45.528+0200|INFO|[test]|beater/test.go:46|==> Last Published event IDs is ACKed: 100999423|
|2018-05-10T14:06:48.513+0200|INFO|[test]|beater/test.go:101|closed beat.Client|
As the queue batching up events is shared between all beat.Client
instances, the queue can't be flushed if one beat.Client
is closing. That is, it's wanted behavior. The global ACK handling and WaitClose functionality (which is not exposed yet), should not suffer from this problem.
My recommendation for now would be:
- Use local
WaitClose
and set WaitClose
as big as possible. E.g. systemd waits (it's configurable) up to 30s before forcing a shutdown. In this case I would set WaitClose to 25s
or even 28s
. This also helps in case you output is blocking for an short amount of time.
- Wait for registry file and global WaitClose becomes available via libbeat. Still, we are working with timeouts, so there is always some races between networks and internal pipelines going on.
- Set an
id
field via Meta
. This will use your custom ID as document ID when indexing. The Elasticsearch output uses create
when indexing events with custom id
. Events with ID already being indexed will be rejected by Elasticsearch, getting you some deduplication behavior. This is also helpful in case of temporary network errors, as the beat will have to retry sending events, potentially generating duplicates if no id
is present.
You sample code already creates an id
. Also ensure this id
is very unique. When reading from a database, the id
could be made out of the database source name + table name + row-id. E.g. <hash of db url + table name> - <row id>
.