[Important] Filebeat goroutine leaking fixing (issue 7820) is missing in v6.5

I don't know what is the reason behind that, but the correction bing in by https://github.com/elastic/beats/pull/7820, are missing in Filebeat v6.5, this cause that we suffer the the memory leak issue again after upgrade to the new version

please check the code: https://github.com/elastic/beats/blob/6.5/filebeat/channel/util.go

package channel

import (
	"github.com/elastic/beats/filebeat/util"
	"github.com/elastic/beats/libbeat/beat"
	"github.com/elastic/beats/libbeat/common"
	"github.com/elastic/beats/libbeat/common/atomic"
)

type subOutlet struct {
	isOpen atomic.Bool
	done   chan struct{}
	ch     chan *util.Data
	res    chan bool
}

// ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory.
func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector {
	return func(cfg *common.Config, m *common.MapStrPointer) (Outleter, error) {
		return factory(pipeline, cfg, m)
	}
}

// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
	s := &subOutlet{
		isOpen: atomic.MakeBool(true),
		done:   make(chan struct{}),
		ch:     make(chan *util.Data),
		res:    make(chan bool, 1),
	}

	go func() {
		for event := range s.ch {
			s.res <- out.OnEvent(event)
		}
	}()

	return s
}

func (o *subOutlet) Close() error {
	isOpen := o.isOpen.Swap(false)
	if isOpen {
		close(o.done)
	}
	return nil
}

func (o *subOutlet) OnEvent(d *util.Data) bool {
	if !o.isOpen.Load() {
		return false
	}

	select {
	case <-o.done:
		close(o.ch)
		return false

	case o.ch <- d:
		select {
		case <-o.done:

			// Note: log harvester specific (leaky abstractions).
			//  The close at this point in time indicates an event
			//  already send to the publisher worker, forwarding events
			//  to the publisher pipeline. The harvester insists on updating the state
			//  (by pushing another state update to the publisher pipeline) on shutdown
			//  and requires most recent state update in the harvester (who can only
			//  update state on 'true' response).
			//  The state update will appear after the current event in the publisher pipeline.
			//  That is, by returning true here, the final state update will
			//  be presented to the registrar, after the last event being processed.
			//  Once all messages are in the publisher pipeline, in correct order,
			//  it depends on registrar/publisher pipeline if state is finally updated
			//  in the registrar.

			close(o.ch)
			return true

		case ret := <-o.res:
			return ret
		}
	}
}

// CloseOnSignal closes the outlet, once the signal triggers.
func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter {
	if sig != nil {
		go func() {
			<-sig
			outlet.Close()
		}()
	}
	return outlet
}

it missing the correction:

but it v6.4 and master

it is

package channel

import (
	"sync"

	"github.com/elastic/beats/filebeat/util"
	"github.com/elastic/beats/libbeat/beat"
	"github.com/elastic/beats/libbeat/common"
)

type subOutlet struct {
	done      chan struct{}
	ch        chan *util.Data
	res       chan bool
	mutex     sync.Mutex
	closeOnce sync.Once
}

// ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory.
func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector {
	return func(cfg *common.Config, m *common.MapStrPointer) (Outleter, error) {
		return factory(pipeline, cfg, m)
	}
}

// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
	s := &subOutlet{
		done: make(chan struct{}),
		ch:   make(chan *util.Data),
		res:  make(chan bool, 1),
	}

	go func() {
		for event := range s.ch {
			s.res <- out.OnEvent(event)
		}
	}()

	return s
}

func (o *subOutlet) Close() error {
	o.closeOnce.Do(func() {
		// Signal OnEvent() to terminate
		close(o.done)
		// This mutex prevents the event channel to be closed if OnEvent is
		// still running.
		o.mutex.Lock()
		defer o.mutex.Unlock()
		close(o.ch)
	})
	return nil
}

func (o *subOutlet) OnEvent(d *util.Data) bool {

	o.mutex.Lock()
	defer o.mutex.Unlock()
	select {
	case <-o.done:
		return false
	default:
	}

	select {
	case <-o.done:
		return false

	case o.ch <- d:
		select {
		case <-o.done:

			// Note: log harvester specific (leaky abstractions).
			//  The close at this point in time indicates an event
			//  already send to the publisher worker, forwarding events
			//  to the publisher pipeline. The harvester insists on updating the state
			//  (by pushing another state update to the publisher pipeline) on shutdown
			//  and requires most recent state update in the harvester (who can only
			//  update state on 'true' response).
			//  The state update will appear after the current event in the publisher pipeline.
			//  That is, by returning true here, the final state update will
			//  be presented to the registrar, after the last event being processed.
			//  Once all messages are in the publisher pipeline, in correct order,
			//  it depends on registrar/publisher pipeline if state is finally updated
			//  in the registrar.
			return true

		case ret := <-o.res:
			return ret
		}
	}
}

// CloseOnSignal closes the outlet, once the signal triggers.
func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter {
	if sig != nil {
		go func() {
			<-sig
			outlet.Close()
		}()
	}
	return outlet
}

Thanks @oldcodeoberyn really good investigation!

I can confirm this using the following:

The outputs show a difference in the files:

git diff 6.5..master --   filebeat/channel/util.go  
git diff 6.x..master --   filebeat/channel/util.go 

This show nothing.

git diff 6.4..master --   filebeat/channel/util.go  

I've created the backport https://github.com/elastic/beats/pull/9593 and https://github.com/elastic/beats/pull/9592 I will get that merge as soon as possible and the fix will go in the next release.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.