Need some help configuring an ACK handler for a custom beat

First of, I tried finding on the tags options a "custom beat" tag, but there is not one there, could I suggest adding such a tag?

I have created a custom beat based on libbeat v7.
I have defined a struct that implements the beat.ACKer interface.

//ACKerStruct ... Define ACK handler
type ACKerStruct struct{}

var eventsTrackSem = sync.Mutex{}
var eventsLeft = 0

//AddEvent .. Does nothing, is an interface requirement.
func (ah *ACKerStruct) AddEvent(event beat.Event, published bool) {
	fmt.Println("AddEvent Has been Called")
	fmt.Printf("\nBeat Event: %v\n", event)
	fmt.Printf("\nPublished Value: %v\n", published)
}

//ACKEvents ... decreases the list eventsLeft, shielded behind a semaphor.
func (ah *ACKerStruct) ACKEvents(n int) {
	fmt.Printf("\nACKEvents called with n = %v\n", n)
	eventsTrackSem.Lock()
	eventsLeft -= n
	eventsTrackSem.Unlock()
}

//Close ... Does nothing, is a interface requirement.
func (ah *ACKerStruct) Close() {}
func increaseEventCount() {
	eventsTrackSem.Lock()
	eventsLeft++
	eventsTrackSem.Unlock()
}

Now, in the New function that creates my beat, I added the code:

//New ... Creates an instance of coletorbeat.
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
	c := config.DefaultConfig
	if err := cfg.Unpack(&c); err != nil {
		return nil, fmt.Errorf("Error reading config file: %v", err)
	}

	myackhandler := &ACKerStruct{}
	client, err := b.Publisher.ConnectWith(beat.ClientConfig{
		PublishMode: beat.GuaranteedSend,
		Processing:  beat.ProcessingConfig{},
		CloseRef:    nil,
		WaitClose:   30 * time.Second,
		ACKHandler:  myackhandler,
	})
	if err != nil {
		panic(err)
	}

	bt := &coletorbeat{
		done:   make(chan struct{}),
		config: c,
		client: client,
	}

	return bt, nil
}

I expected to see the prints when I run the beat, whenever an event is sent to the pipeline, but I don't see anything, it is as if the ACKer function is not being called at all.
I would like some help finding out what I'm doing wrong if you can.
Thats it

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.