Flushing after yourself

The pipeline is setup by libbeat. The operator can configure how exactly the pipeline will be configured (we just added spooling to disk). You should not have to setup the pipeline in your own beat.

The pipeline has a producer/client, which is your own beat. And it has a consumer, which are the outputs.

The pipeline decouples the beat from the outputs.
The interface between the pipeline and the beat mandates:

  • producer publishes single events
  • pipeline returns ordered ACK to producer or global ACK handler.
    • this means, your ACK handler will be invoked in the same order you have published events. Using ACKCount, the count returned is the first N events you have published. No out of order signaling
  • pipeline always ACKs events. From producer point of view, there is no event drop.
    => event drops, infinite retry, is configured and handled in the publisher pipeline.

The interface between pipeline and outputs works a little different:

  • ACK can be out of order
  • ACK can be asynchronous at any time
  • pipeline distributes batches of events to outputs
  • outputs never drop events, but report errors/retry => pipeline configuration (that is the operator) decides if events will be dropped or retried.

There are a few hooks one can use. The hooks guarantee you all events being reported to your callback have been actually ACKed by an output (or dropped, depending on internal configuration).

If I understand you correctly, you want to 'remember' the ID of the next non-ACKed event on shutdown. This is a similar problem to filebeat. In filebeat we want to store the offset of the next event in the registry file, not the offset of last ACKed event, so we don't send the event again. This is solved in Filebeat by storing the next offset to read from in (beat.Event).Private. Filebeat installs a global ACK handler with the pipeline, and forwards the 'next' state to the registrar after 'casting' call values to file.State. The ack handler gets the 'last' ACKed event for each producer in one batch.

When publishing event i, do you already now the ID of event i+1? If so, you can store the ID of event i+1 with the ith event Private field and just do as filebeat does.

If you can not tell the next ID in advance, a simple solution would be to do 'deduplication' in your beat on startup. Just store the last ACKed event ID. On startup read the same event again, but drop it immediately (don't forward to libbeat). What's nice about this approach is, if your data source is empty and you can not read any more events, the current events ID is still valid and allows you to restart on an data source with all events already being processed.

Alternatively you can use ACKCount. It's a local callback one configures when connecting to the pipeline. Internally the other event based ack handlers actually re-use the ACKCount callback, as ACKCount guarantees the callback is being called in order. Using ACKCount, the event based ack handlers store an array of active events. Once the ack handler is called, the array of events is used to determine already ACKed events and pending events. The acked events are removed from the array. Using the approach, the first event in your array is the actual pending one. Some sample code:

type producer struct {
  client beat.Client

  mu sync.Mutex
  pending []events
}

// init code, connection to the publisher pipeline:
func newProducer(config Config, pipeline beat.Pipeline) (*producer, error) {
  ...

  p := &producer{}

  client, err := pipeline.ConnectWith(beat.ClientConfig{
    PublishMode: GuaranteedSend, // configure infinite retry
    EventMetadata: config.EventMeta, // producer local `fields` and `tags` settings
    Processor: config.Processor, // producer local event processors
    WaitClose: config.WaitClose, // allow user to configure additional wait on shutdown
    ACKCount: p.onACK, // ack handler
  })
  
  if err != nil {
    return nil, err
  }

  p.client = client
  return p, nil
}

func (p *producer) onACK(n int) {
  p.mu.Lock()
  pending := p.events[n:]
  p.events = pending
  p.mu.Unlock()

  if len(pending) > 0 {
    p.storeNext(pending[0].ID()) // always forward 'continuation' state for serialization in case the beat crashes or machine has a power outage
  }
}

// next returns the id of the pending event to continue processing after restart:
func (p *producer) next() string {
  p.mu.Lock()
  defer p.mu.Unlock()
  if len(p.events) == 0 {
    return ""
  }
  return p.events[0].ID()
}

func (p *producer) publish(event *event) {
  p.mu.Lock()
  p.events = append(p.events, event)
  p.mu.Unlock()
  
  // publish event to libbeat
  p.client.Publish(...)
}

As one wants to continue processing from last ACKed event, kill -9 or power outage should be concern as well. In filebeat/winlogbeat the registry file is written every now and then.