First of, I tried finding on the tags options a "custom beat" tag, but there is not one there, could I suggest adding such a tag?
I have created a custom beat based on libbeat v7.
I have defined a struct that implements the beat.ACKer interface.
//ACKerStruct ... Define ACK handler
type ACKerStruct struct{}
var eventsTrackSem = sync.Mutex{}
var eventsLeft = 0
//AddEvent .. Does nothing, is an interface requirement.
func (ah *ACKerStruct) AddEvent(event beat.Event, published bool) {
fmt.Println("AddEvent Has been Called")
fmt.Printf("\nBeat Event: %v\n", event)
fmt.Printf("\nPublished Value: %v\n", published)
}
//ACKEvents ... decreases the list eventsLeft, shielded behind a semaphor.
func (ah *ACKerStruct) ACKEvents(n int) {
fmt.Printf("\nACKEvents called with n = %v\n", n)
eventsTrackSem.Lock()
eventsLeft -= n
eventsTrackSem.Unlock()
}
//Close ... Does nothing, is a interface requirement.
func (ah *ACKerStruct) Close() {}
func increaseEventCount() {
eventsTrackSem.Lock()
eventsLeft++
eventsTrackSem.Unlock()
}
Now, in the New function that creates my beat, I added the code:
//New ... Creates an instance of coletorbeat.
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
c := config.DefaultConfig
if err := cfg.Unpack(&c); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
myackhandler := &ACKerStruct{}
client, err := b.Publisher.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{},
CloseRef: nil,
WaitClose: 30 * time.Second,
ACKHandler: myackhandler,
})
if err != nil {
panic(err)
}
bt := &coletorbeat{
done: make(chan struct{}),
config: c,
client: client,
}
return bt, nil
}
I expected to see the prints when I run the beat, whenever an event is sent to the pipeline, but I don't see anything, it is as if the ACKer function is not being called at all.
I would like some help finding out what I'm doing wrong if you can.
Thats it