I'm trying to import filebeat source code to read text from file. I understand that filebeat saves offset to filesystem so when it restarts, it won't read text from beginning of the file. How can I implement this function? I write a simple code below but everytime I restart the code, it reads the file from beginning.
package main
import (
"fmt"
"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/input/log"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"path"
"sync"
"time"
)
type eventCapturer struct {
closed bool
c chan struct{}
closeOnce sync.Once
events chan beat.Event
}
func (o *eventCapturer) OnEvent(event beat.Event) bool {
o.events <- event
return true
}
func (o *eventCapturer) Close() error {
o.closeOnce.Do(func() {
o.closed = true
close(o.c)
})
return nil
}
func (o *eventCapturer) Done() <-chan struct{} {
return o.c
}
func NewEventCapturer(events chan beat.Event) channel.Outleter {
return &eventCapturer{
c: make(chan struct{}),
events: events,
}
}
func main() {
config, _ := common.NewConfigFrom(common.MapStr{
"paths": path.Join("/home/sf/logs", "*.log"),
"close_eof": false,
})
events := make(chan beat.Event, 100)
defer close(events)
capturer := NewEventCapturer(events)
connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
return channel.SubOutlet(capturer), nil
})
context := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}
inp, err := log.NewInput(config, connector, context)
if err != nil {
fmt.Println(err)
panic(err)
}
inp.Run()
defer inp.Stop()
timeout := time.After(30 * time.Second)
done := make(chan struct{})
for {
select {
case event := <- events:
fmt.Println(event)
if state, ok := event.Private.(file.State); ok && state.Finished {
go func() {
close(done)
close(context.Done)
close(context.BeatDone)
}()
}
case <-done:
return
case <-timeout:
fmt.Println("timeout waiting for closed state")
}
}
}
Sorry for my innocent question but is it legal to import filebeat like this?
Thanks.