How can I setup file offset state with filebeat source code?

I'm trying to import filebeat source code to read text from file. I understand that filebeat saves offset to filesystem so when it restarts, it won't read text from beginning of the file. How can I implement this function? I write a simple code below but everytime I restart the code, it reads the file from beginning.

package main

import (
	"fmt"
	"github.com/elastic/beats/filebeat/channel"
	"github.com/elastic/beats/filebeat/input"
	"github.com/elastic/beats/filebeat/input/file"
	"github.com/elastic/beats/filebeat/input/log"
	"github.com/elastic/beats/libbeat/beat"
	"github.com/elastic/beats/libbeat/common"
	"path"
	"sync"
	"time"
)

type eventCapturer struct {
	closed    bool
	c         chan struct{}
	closeOnce sync.Once
	events    chan beat.Event
}

func (o *eventCapturer) OnEvent(event beat.Event) bool {
	o.events <- event
	return true
}

func (o *eventCapturer) Close() error {
	o.closeOnce.Do(func() {
		o.closed = true
		close(o.c)
	})
	return nil
}

func (o *eventCapturer) Done() <-chan struct{} {
	return o.c
}

func NewEventCapturer(events chan beat.Event) channel.Outleter {
	return &eventCapturer{
		c:      make(chan struct{}),
		events: events,
	}
}

func main() {
	config, _ := common.NewConfigFrom(common.MapStr{
		"paths":     path.Join("/home/sf/logs", "*.log"),
		"close_eof": false,
	})
	events := make(chan beat.Event, 100)
	defer close(events)
	capturer := NewEventCapturer(events)
	connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
		return channel.SubOutlet(capturer), nil
	})
	context := input.Context{
		Done:     make(chan struct{}),
		BeatDone: make(chan struct{}),
	}
	inp, err := log.NewInput(config, connector, context)
	if err != nil {
		fmt.Println(err)
		panic(err)
	}
	inp.Run()
	defer inp.Stop()

	timeout := time.After(30 * time.Second)
	done := make(chan struct{})
	for {
		select {
		case event := <- events:
			fmt.Println(event)
			if state, ok := event.Private.(file.State); ok && state.Finished {
				go func() {
					close(done)
					close(context.Done)
					close(context.BeatDone)
				}()
			}
		case <-done:
			return
		case <-timeout:
			fmt.Println("timeout waiting for closed state")
		}
	}
}

Sorry for my innocent question but is it legal to import filebeat like this?
Thanks.

Please help ...

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.