[Important] Filebeat goroutine leaking fixing (issue 7820) is missing in v6.5


(Oldcodeoberyn) #1

I don't know what is the reason behind that, but the correction bing in by https://github.com/elastic/beats/pull/7820, are missing in Filebeat v6.5, this cause that we suffer the the memory leak issue again after upgrade to the new version

please check the code: https://github.com/elastic/beats/blob/6.5/filebeat/channel/util.go

package channel

import (
	"github.com/elastic/beats/filebeat/util"
	"github.com/elastic/beats/libbeat/beat"
	"github.com/elastic/beats/libbeat/common"
	"github.com/elastic/beats/libbeat/common/atomic"
)

type subOutlet struct {
	isOpen atomic.Bool
	done   chan struct{}
	ch     chan *util.Data
	res    chan bool
}

// ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory.
func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector {
	return func(cfg *common.Config, m *common.MapStrPointer) (Outleter, error) {
		return factory(pipeline, cfg, m)
	}
}

// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
	s := &subOutlet{
		isOpen: atomic.MakeBool(true),
		done:   make(chan struct{}),
		ch:     make(chan *util.Data),
		res:    make(chan bool, 1),
	}

	go func() {
		for event := range s.ch {
			s.res <- out.OnEvent(event)
		}
	}()

	return s
}

func (o *subOutlet) Close() error {
	isOpen := o.isOpen.Swap(false)
	if isOpen {
		close(o.done)
	}
	return nil
}

func (o *subOutlet) OnEvent(d *util.Data) bool {
	if !o.isOpen.Load() {
		return false
	}

	select {
	case <-o.done:
		close(o.ch)
		return false

	case o.ch <- d:
		select {
		case <-o.done:

			// Note: log harvester specific (leaky abstractions).
			//  The close at this point in time indicates an event
			//  already send to the publisher worker, forwarding events
			//  to the publisher pipeline. The harvester insists on updating the state
			//  (by pushing another state update to the publisher pipeline) on shutdown
			//  and requires most recent state update in the harvester (who can only
			//  update state on 'true' response).
			//  The state update will appear after the current event in the publisher pipeline.
			//  That is, by returning true here, the final state update will
			//  be presented to the registrar, after the last event being processed.
			//  Once all messages are in the publisher pipeline, in correct order,
			//  it depends on registrar/publisher pipeline if state is finally updated
			//  in the registrar.

			close(o.ch)
			return true

		case ret := <-o.res:
			return ret
		}
	}
}

// CloseOnSignal closes the outlet, once the signal triggers.
func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter {
	if sig != nil {
		go func() {
			<-sig
			outlet.Close()
		}()
	}
	return outlet
}

it missing the correction:

but it v6.4 and master

it is

package channel

import (
	"sync"

	"github.com/elastic/beats/filebeat/util"
	"github.com/elastic/beats/libbeat/beat"
	"github.com/elastic/beats/libbeat/common"
)

type subOutlet struct {
	done      chan struct{}
	ch        chan *util.Data
	res       chan bool
	mutex     sync.Mutex
	closeOnce sync.Once
}

// ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory.
func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector {
	return func(cfg *common.Config, m *common.MapStrPointer) (Outleter, error) {
		return factory(pipeline, cfg, m)
	}
}

// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
	s := &subOutlet{
		done: make(chan struct{}),
		ch:   make(chan *util.Data),
		res:  make(chan bool, 1),
	}

	go func() {
		for event := range s.ch {
			s.res <- out.OnEvent(event)
		}
	}()

	return s
}

func (o *subOutlet) Close() error {
	o.closeOnce.Do(func() {
		// Signal OnEvent() to terminate
		close(o.done)
		// This mutex prevents the event channel to be closed if OnEvent is
		// still running.
		o.mutex.Lock()
		defer o.mutex.Unlock()
		close(o.ch)
	})
	return nil
}

func (o *subOutlet) OnEvent(d *util.Data) bool {

	o.mutex.Lock()
	defer o.mutex.Unlock()
	select {
	case <-o.done:
		return false
	default:
	}

	select {
	case <-o.done:
		return false

	case o.ch <- d:
		select {
		case <-o.done:

			// Note: log harvester specific (leaky abstractions).
			//  The close at this point in time indicates an event
			//  already send to the publisher worker, forwarding events
			//  to the publisher pipeline. The harvester insists on updating the state
			//  (by pushing another state update to the publisher pipeline) on shutdown
			//  and requires most recent state update in the harvester (who can only
			//  update state on 'true' response).
			//  The state update will appear after the current event in the publisher pipeline.
			//  That is, by returning true here, the final state update will
			//  be presented to the registrar, after the last event being processed.
			//  Once all messages are in the publisher pipeline, in correct order,
			//  it depends on registrar/publisher pipeline if state is finally updated
			//  in the registrar.
			return true

		case ret := <-o.res:
			return ret
		}
	}
}

// CloseOnSignal closes the outlet, once the signal triggers.
func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter {
	if sig != nil {
		go func() {
			<-sig
			outlet.Close()
		}()
	}
	return outlet
}

(Pier-Hugues Pellerin) #2

Thanks @oldcodeoberyn really good investigation!

I can confirm this using the following:

The outputs show a difference in the files:

git diff 6.5..master --   filebeat/channel/util.go  
git diff 6.x..master --   filebeat/channel/util.go 

This show nothing.

git diff 6.4..master --   filebeat/channel/util.go  

(Pier-Hugues Pellerin) #3

I've created the backport https://github.com/elastic/beats/pull/9593 and https://github.com/elastic/beats/pull/9592 I will get that merge as soon as possible and the fix will go in the next release.


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.