Flushing after yourself


(Ecc256) #1

I’ve rewritten logstash processing logic with beats.
Beats starts a lot faster, consumes less memory and feels snappier.
Overall beats are pleasure to work with, even debugging is fun.

The only problem it keeps loosing last posted events.
It helps to use time.Sleep(2 * time.Second) just before return from Run() method on development computer.
(It doesn’t run continuously as of now. For testing purposes I publish few thousands evens and exit)

I guess when it runs on real server which might quite busy at times, 2 seconds of more might not be enough.

I saw a comment somewhere in libbeat source files it is expected behavior, but I cannot find now where it was...

What is the right way to flush pipeline before closing to avoid dropping events?

There was somewhat related question. I was not able to find how Sync and Gauranteed are used in PublishEvents call in winlogbeat.go
Probably, implementation might have changed since then?


(Steffen Siering) #2

Which libbeat version are you using?

In filebeat we introduced a shutdown_wait timeout. When filebeat shuts down, it first stops all producers (prospectors and harvesters), but keeps the publishing pipeline and outputs alive. For this to work, filebeat uses a counter of active events. If an event is published, the counter is increased and when the asynchronous ACK signal is returned from the pipeline is the counter decreased. After having shutdown all producers, filebeat waits for the counter to become 0. Filebeat continues the shutdown sequence once the counter has become 0 or if filebeat did already wait for the duration configured by shutdown_timeout.

The problem is, we don't want to wait forever. If an output is unresponsive, we don't want to block shutdown for hours. One is supposed to configure the wait duration to the max acceptable wait on shutdown (e.g. systemd kills the process if shutdown takes too long).

In 6.0 we did rewrite the publisher pipeline and added a similar feature to libbeat. When creating the pipeline, one can pass additional shutdown setting to the pipeline. So you don't need to do any accounting for active events in your beat.

Unfortunately no beat can use this setting yet, as we need to clean up filebeat shutdown before introducing the setting for all beats. So filebeat and libbeat publisher pipeline shutdown handling do not interfere.

In 6.0, one uses a beat.Client instance as connection to the publisher pipeline. Each event producer should have it's own client. When connecting to the publisher pipeline, one can pass additional settings to (beat.Pipeline).ConnectWith. Setting WaitClose and one of the ack handlers ACK... (ACKCount is the least intrusive) should get you some similar shutdown wait behavior, but per producer, not global.


(Ecc256) #3

Steffen,
Thanks a lot for you response!

6.2
All tests are done with simplest beat possible, created with:
python ${GOPATH}/src/github.com/elastic/beats/script/generate.py --type=beat ...

Understood.
And that’s why I want to use different approach.
I need to know lowest dropped event._id (aka document_id for logstash) for a pipeline, so I know where to restart.
event._id is set it via event.SetID()is ascending always in my case.

Even if beat published events are not posted in the same order to the output and after restart same event(s) might be posted again - it won’t be a problem.

Do you think it is viable approach?
If yes, how can I get lowest dropped event._id for a pipeline?
Do I have to write my own pipeline to get it?
I don’t think it’s possible to hook into existing pipeline ${GOPATH}/src/github.com/elastic/beats/libbeat/beat/pipeline.go
Am I wrong about it?


(Steffen Siering) #4

The pipeline is setup by libbeat. The operator can configure how exactly the pipeline will be configured (we just added spooling to disk). You should not have to setup the pipeline in your own beat.

The pipeline has a producer/client, which is your own beat. And it has a consumer, which are the outputs.

The pipeline decouples the beat from the outputs.
The interface between the pipeline and the beat mandates:

  • producer publishes single events
  • pipeline returns ordered ACK to producer or global ACK handler.
    • this means, your ACK handler will be invoked in the same order you have published events. Using ACKCount, the count returned is the first N events you have published. No out of order signaling
  • pipeline always ACKs events. From producer point of view, there is no event drop.
    => event drops, infinite retry, is configured and handled in the publisher pipeline.

The interface between pipeline and outputs works a little different:

  • ACK can be out of order
  • ACK can be asynchronous at any time
  • pipeline distributes batches of events to outputs
  • outputs never drop events, but report errors/retry => pipeline configuration (that is the operator) decides if events will be dropped or retried.

There are a few hooks one can use. The hooks guarantee you all events being reported to your callback have been actually ACKed by an output (or dropped, depending on internal configuration).

If I understand you correctly, you want to 'remember' the ID of the next non-ACKed event on shutdown. This is a similar problem to filebeat. In filebeat we want to store the offset of the next event in the registry file, not the offset of last ACKed event, so we don't send the event again. This is solved in Filebeat by storing the next offset to read from in (beat.Event).Private. Filebeat installs a global ACK handler with the pipeline, and forwards the 'next' state to the registrar after 'casting' call values to file.State. The ack handler gets the 'last' ACKed event for each producer in one batch.

When publishing event i, do you already now the ID of event i+1? If so, you can store the ID of event i+1 with the ith event Private field and just do as filebeat does.

If you can not tell the next ID in advance, a simple solution would be to do 'deduplication' in your beat on startup. Just store the last ACKed event ID. On startup read the same event again, but drop it immediately (don't forward to libbeat). What's nice about this approach is, if your data source is empty and you can not read any more events, the current events ID is still valid and allows you to restart on an data source with all events already being processed.

Alternatively you can use ACKCount. It's a local callback one configures when connecting to the pipeline. Internally the other event based ack handlers actually re-use the ACKCount callback, as ACKCount guarantees the callback is being called in order. Using ACKCount, the event based ack handlers store an array of active events. Once the ack handler is called, the array of events is used to determine already ACKed events and pending events. The acked events are removed from the array. Using the approach, the first event in your array is the actual pending one. Some sample code:

type producer struct {
  client beat.Client

  mu sync.Mutex
  pending []events
}

// init code, connection to the publisher pipeline:
func newProducer(config Config, pipeline beat.Pipeline) (*producer, error) {
  ...

  p := &producer{}

  client, err := pipeline.ConnectWith(beat.ClientConfig{
    PublishMode: GuaranteedSend, // configure infinite retry
    EventMetadata: config.EventMeta, // producer local `fields` and `tags` settings
    Processor: config.Processor, // producer local event processors
    WaitClose: config.WaitClose, // allow user to configure additional wait on shutdown
    ACKCount: p.onACK, // ack handler
  })
  
  if err != nil {
    return nil, err
  }

  p.client = client
  return p, nil
}

func (p *producer) onACK(n int) {
  p.mu.Lock()
  pending := p.events[n:]
  p.events = pending
  p.mu.Unlock()

  if len(pending) > 0 {
    p.storeNext(pending[0].ID()) // always forward 'continuation' state for serialization in case the beat crashes or machine has a power outage
  }
}

// next returns the id of the pending event to continue processing after restart:
func (p *producer) next() string {
  p.mu.Lock()
  defer p.mu.Unlock()
  if len(p.events) == 0 {
    return ""
  }
  return p.events[0].ID()
}

func (p *producer) publish(event *event) {
  p.mu.Lock()
  p.events = append(p.events, event)
  p.mu.Unlock()
  
  // publish event to libbeat
  p.client.Publish(...)
}

As one wants to continue processing from last ACKed event, kill -9 or power outage should be concern as well. In filebeat/winlogbeat the registry file is written every now and then.


(Ecc256) #5

Steffen,
Once again, thanks a lot for you response, it’s very helpful!

Yep, did as you described. It’s quite simple indeed. My test code is below, if you want to comment it.

I think, I do understand how to it works too. Via ACKCount one can get access to whole event, not just its Private part, but it does need a bit more code.

I don’t see any difference between ACKEvents and ACKCount from "kill -9 or power outage" perspective. Am I wrong about it?

The only problem left is how to write last ACK ID reliably (without corruption) in case of power outage event. I recall seeing code in registrar.writeRegistry() already.

package beater

type sqlbeat struct {
	...
}
...
func (bt *sqlbeat) ackEvents(data []interface{}) {
	datum := data[len(data)-1]
	if datum != nil {
		lastID, ok := datum.(int)
		if ok {
			logp.Info("==> Last ACK event: %d", lastID) // Save lastID here...
		}
	}
	logp.Info("==> ACK batch events: %d", len(data))
}

func (bt *sqlbeat) Run(b *beat.Beat) error {
	logp.Info("sqlbeat is running! Hit CTRL-C to stop it.")

	var err error
	err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
		ACKEvents: bt.ackEvents,
	})
	if err != nil {	return err }

	bt.client, err = b.Publisher.Connect()
	if err != nil { return err }

	counter := 0
	for counter < 1000 {
		bt.client.Publish(beat.Event{
			Timestamp: time.Now(),
			Fields: common.MapStr{ "counter": counter, },
			//Fields: common.MapStr{ },
			Private: counter,
		})
		counter++
	}
	logp.Info("==> Events published total: %d", counter)

	time.Sleep(1 * time.Second)
	return nil
}

(Steffen Siering) #6

ACKEvent and ACKCount are indeed quite similar. It's the same number of events being ACKed. ACKEvent is a little more convenient, as some bookeeping is moved into libbeat. ACKCount allows for some more custom handling though.

Why did you choose ACKEvents and not ACKLastEvents? The data will be much smaller then with ACKLastEvents. ACKEvents returns you all events published and ACKed. ACKLastEvents returns you only the very last event in the batch of ACKed events. If event sequence number is ever increasing (think rowID), the last one should be good enough. This reduces number of state updates you have to process.

Right now you will have to handle the 'registry' yourself. In the (unfortunately far) future this functionality will be provided by libbeat. The plan is to have the beat set the Private field only and libbeat will automatically update the registry for you.

For updating the registry 3 implementation coming to mind:

  1. Use a key value store with proper transaction and fsync support (e.g. BoltDB). Alternative use SQLLite or any other embedded, but transactional store.
  2. Use the filebeat approach and write the registry into new temporary file, do fsync, do rename to replace original file, do fsync on parent directory
  3. Use snaphots + append logs. e.g. A json file with actual state and updates. state entries must have a unique key. Each state is one json object. Each update is one json object. Use append to write recent updates into file. Use fsync from time to time. When you've accumulated too many updates, create a new snapshot (see approach 2), so to remove update messages (can be done asynchronously). On restart just stream through the registry file using json.Decoder. At some point you will hit EOF or some parsing error. A parsing error will indicate some loss/corruption, due to missing fsync (e.g. power outage). But as the snapshot has been completed when the file was created, the corruption at the end of the file can be "ignored".

Option 1 is quite simple. Just use BoltDB and forward state updates to worker updating BoltDB. Option 2 and 3 on the other hand allow your users to access and process the current state using external scripts. e.g. allow user to write scripts to delete rows that have been already processed and ACKed by your beat. Feel free to steal the registry update code from filebeat or winlogbeat (Apache 2 License).


(Ecc256) #7

I didn’t notice it. It was right next to ACKEvents, but I’ve managed to miss it still…
Thanks for pointing out!
It took less them a minute to switch to ACKLastEvents! :slight_smile:

That’ll be great!
A couple registry implementations, SQL/transactional and file system based is all one needs, IMO.

There is a transactional database in pretty much any environment I know.
Not sure if creating extra dependency in beat worse it, but that’s for ends users to decide.

Thanks, I do appreciate it!
My beat pulls data from SQL already, I’ll use it for tracking too.


(Ecc256) #8

Steffen,

Sorry to bug you again, I have couple more questions:
If I connect with GuaranteedSend

bt.client, err = b.Publisher.ConnectWith(beat.ClientConfig{
	PublishMode: beat.GuaranteedSend,
})

and remove
time.Sleep(1 * time.Second)
at the very end of Run() method
the log shows just this:
2018-04-27T17:57:10.419-0400 INFO instance/beat.go:308 sqlbeat stopped.

And with time.Sleep() it looks like this:
2018-04-27T17:56:43.914-0400 INFO instance/beat.go:308 sqlbeat stopped.
2018-04-27T17:56:43.919-0400 INFO elasticsearch/client.go:690 Connected to Elasticsearch version 6.2.4
2018-04-27T17:56:43.921-0400 INFO template/load.go:73 Template already exists and will not be overwritten.
2018-04-27T17:56:43.940-0400 INFO beater/sqlbeat.go:108 ==> Last ACK/Published IDs event: 425090/425090

I.e. if there are very few evens, my beat completes faster than Elasticsearch connection is established and thus nothing is published...
How do I wait for Elasticsearch connection before exiting Run()?

BTW: Adding WaitClose: bt.config.WaitClose to beat.ClientConfig changes nothing.
Only time.Sleep() helps.

Another issue is very similar.
my beat updates database in ackLastEvents()
Connection to database is created in Run().
Run() completes before ackLastEvents() is executed.
Naturally, I cannot use defer db.Close() in Run().
Where/when should I close database connection then?
I do such cleanup in destructor usually, but there is no such thing is in Go...
I’m quite a Go newbie still. :blush:

  • Database connection open() is relatively long operation, that’s why it’s better use existing one.
    Plus I do run prepared SQL statement to speed up updates too.

(Steffen Siering) #9

This is the 'wait shutdown timeout' support I've been explaining in my first post to this topic.

Right now the shutdown timeout can not be configured globally. But local wait shutdown should work (given you have configured a local ACK handler).

E.g. update your code to:

bt.client, err = b.Publisher.ConnectWith(beat.ClientConfig{
  PublishMode: beat.GuaranteedSend,
  WaitClose: 10 * time.Second,
  ACKCount: func(n int) {
    // ignore
  },
})
defer bt.client.Close()

The Close method will wait for at most WaitClose, if the client has still pending un-acked events.

Btw. don't use bt.client. With 6.0, one should use one client per worker collecting events/documents.

First make sure, you stop collecting data. On shutdown, we can still wait for events to be published

The shutdown sequence in your beat should be:

  • stop collecting events
  • close connection to database
  • close beat.Client // <- this waits if WaitClose + ACK

As database access or event pubishing might block, the clients should be closed asynchronously.


(Steffen Siering) #10

For global shutdown support, one needs to enable it in libbeat. E.g. the library part already supports global wait on close (See pipeline.Settings type passed to the pipeline constructor).

But the pipeline loader (creating a pipeline from user configs) hard codes the the wait shutdown to 0. Reason for it being hard coded to 0 is some more required refactoring of filebeat, so we can remove the internal shutdown timeout. In the future the wait on shutdown will be exposed to all beats using a common setting.

If you want to give it a try, I'm fine with already adding global settings configuring the wait shutdown support, but the default should be 0, so to not interfere with filebeat.

Normally the beat's Run returns first (beat.Client still should be closed), indicating the beat has finished collecting data. Once Run returns, the publisher pipeline will be turned down. This is where the global wait on shutdown kicks in. The tricky part is the registry file (That's why filebeat has it's own shutdown handling), as the global registry file needs to be closed after the pipeline is Closed. By integrating the registry support into libbeat, proper registry closing will be provided by libbeat. The registry integration into libbeat is the main reason we don't have the global wait on shutdown available yet.


(Ecc256) #11

Aha!
It seems I was missing defer bt.client.Close()

Didn’t quite get it…
How does it affect my/custom code exactly?

Database is registry - it can be closed only after last ACK is executed.

The beat's Run is the only place where database connection can be closed as of now, right?

I guess, I’ll better wait until this following is done:

I have run tests with and without time.Sleep(3 * time.Second) before Run() returns.
Got mismatch on ID reported by ACKLastEvent and pipeline itself.
This is for "libbeat": "7.0.0-alpha1"

With time.Sleep(3 * time.Second)
beater/perfmon.go:103 ==> Last ACK/Published IDs event: 24976/25000
beater/perfmon.go:103 ==> Last ACK/Published IDs event: 25000/25000
beater/perfmon.go:243 ==> bt.client.Close()
beater/perfmon.go:228 ==> bt.db.Close()

"output":{"events":{"batches":501,"duplicates":25000,"total":25000},"read":{"bytes":411932},"type":"elasticsearch","write":{"bytes":9392669}},
"pipeline":{"clients":0,"events"{"active":0,"published":25000,"retry":50,"total":25000},
"queue":{"acked":25000}}},"system":{"cpu":{"cores":4}}}}}

without time.Sleep(3 * time.Second)
beater/perfmon.go:103 ==> Last ACK/Published IDs event: 24926/25000
beater/perfmon.go:103 ==> Last ACK/Published IDs event: 24976/25000
beater/perfmon.go:243 ==> bt.client.Close()
beater/perfmon.go:228 ==> bt.db.Close()

"output":{"events":{"batches":501,"duplicates":25000,"total":25000},"read":{"bytes":411936},"type":"elasticsearch","write":{"bytes":9392669}},
"pipeline":{"clients":0,"events":{"active":0,"published":25000,"retry":50,"total":25000},
"queue":{"acked":25000}}},"system":{"cpu":{"cores":4}}}}}


(Steffen Siering) #12

Btw. don't use bt.client. With 6.0, one should use one client per worker collecting events/documents.

Didn’t quite get it…
How does it affect my/custom code exactly?

It only affects your beat if you have multiple workers/sources publishing to libbeat. If you do everything in your Run method, it's no issue.

Normally the beat's Run returns first (beat.Client still should be closed), indicating the beat has finished collecting data.
Once Run returns, the publisher pipeline will be turned down.

The beat's Run is the only place where database connection can be closed as of now, right?

It depends. The Stop is supposed to tell the beat to stop. In case the db query can block for too long, you might try to close the connection on Stop. But given DBs normally don't block for this long, closing in Run should be ok.

I have run tests with and without time.Sleep(3 * time.Second) before Run() returns.
Got mismatch on ID reported by ACKLastEvent and pipeline itself.

This looks like a race. You are comparing values in 2 go-routines here, plus there are a few channels and some processing in between. The metric you highlighted is by the output. The output returns the ACK to the backing queue. The backing queue potentially batches the ACKs with still pending ACKs (not yet reported, due to some internal blocking). The batched ACKs are used to clear space in the queue itself and figure which beat.Client instances have been ACKed. As you're using a global callback, these will be collected in correct order and finally given to another channel, which will then call your ACK handler.

Did you apply any changes to your code (besides waiting before bt.client.Close())?


(Ecc256) #13

I think I did everything you suggested. I’ve wrote tiny test beat to show the problem (not the race condition, tho)
With time.Sleep(bt.config.RunSleep) commented out, the very last ACK is executed when isRegistryOpen == false always. (isRegistryOpen mocks isDatabaseOpen)
It’s either a (subtle?) bug in my code or I have to wait until you integrate Registry into libbeat.

package beater

import (
	"strconv"
	"time"

	"github.com/elastic/beats/libbeat/beat"
	"github.com/elastic/beats/libbeat/common"
	"github.com/elastic/beats/libbeat/logp"

	"testbeat/config"
)

var (
	logX            *logp.Logger
	lastPublishedID int
	isRegistryOpen  bool
)

type test struct {
	done   chan struct{}
	config config.Config
	client beat.Client
}

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
	config := config.DefaultConfig
	if err := cfg.Unpack(&config); err != nil {
		return nil, err
	}

	bt := &test{
		done:   make(chan struct{}),
		config: config,
	}
	return bt, nil
}

func (bt *test) ackLastEvent(data interface{}) {
	if data != nil {
		if lastAckID, ok := data.(int); ok {
			if lastAckID == lastPublishedID {
				logX.Infof("==> Last Published event IDs is ACKed: %d", lastAckID)
			}
		}
	}
	if !isRegistryOpen {
		logX.Warn("==> Cannot update registry...")
	}
}

func (bt *test) publishRow(id int) {
	event := beat.Event{
		Timestamp: time.Now(),
		Meta:      common.MapStr{"id": strconv.FormatInt(int64(id), 10)},
		Fields:    common.MapStr{"id": id},
		Private:   id,
	}

	bt.client.Publish(event)
	lastPublishedID = id
}

func (bt *test) run() error {
	for count, eventCount := 0, bt.config.EventCount; count < eventCount; count++ {
		bt.publishRow(count + 100000000)
	}
	logX.Infof("==> Events published: %d", lastPublishedID)

	//time.Sleep(bt.config.RunSleep)
	return nil
}

func (bt *test) Run(b *beat.Beat) error {
	logX = logp.NewLogger(b.Info.Beat)

	isRegistryOpen = true
	defer func() { isRegistryOpen = false }()

	var err error
	bt.client, err = b.Publisher.ConnectWith(beat.ClientConfig{
		PublishMode:  beat.GuaranteedSend, // infinite retry
		WaitClose:    bt.config.WaitClose, // additional wait on shutdown
		ACKLastEvent: bt.ackLastEvent,
	})
	if err != nil {
		return err
	}
	defer bt.client.Close()

	return bt.run()
}

func (bt *test) Stop() {
	bt.client.Close()
	close(bt.done)
}

(Steffen Siering) #14

Can you share the complete beat (e.g. via gist)? I would like to give it a test run.

Which value did you use for bt.config.WaitClose ?


(Ecc256) #15

gist
WaitClose: 3s


(Steffen Siering) #16

Thank you for the test code.

I modified your test code a little:

type test struct {
	done   chan struct{}
	config config.Config
	client beat.Client

	printACK atomic.Bool
}

...

func (bt *test) ackLastEvent(data interface{}) {
	if data != nil {
		if lastAckID, ok := data.(int); ok {
			if bt.printACK.Load() || lastAckID == lastPublishedID {
				logX.Infof("==> Last Published event IDs is ACKed: %d", lastAckID)
			}
		}
	}

	if !isRegistryOpen {
		logX.Warn("==> Cannot update registry...")
	}
}

...

// replace `defer bt.client.Close()` with
	defer func() {
		bt.printACK.Store(true)

		logX.Info("closing beat.Client")
		bt.client.Close()
		logX.Info("closed beat.Client")
	}()

This change prints some more status before and after closing (including timestamps, thanks to the logger). Before calling Close, the printACK flag is set to true, enforcing the ack handler to print all acks that happen while Close blocks.

Due to having timeouts on close and never blocking forever, there is always the chance of race conditions. Libbeat assumes the worker state being invalided after beat.Close. That is, any local ACKs after bt.Close returns will be filtered out (local callback will not be run anymore).

All in all the tests did run fine for me. Well, I was publishing to /dev/null, so the output was never really creating back-pressure. Having back-pressure from the output can affect the timing as well, potentially missing some ACKs due to the client being already closed.

I did run the beat with ./testbeat -e -v -c test.yml > /dev/null (logs are printed to stderr :wink: ).

For this configuration:

test:
  EventCount:   1000000
  WaitClose:	3s

output.console.pretty: true

queue.mem:
  flush.timeout: 1s

I did always get this output:

|2018-05-10T14:07:13.391+0200|INFO|[test]|beater/test.go:74|==> Events published: 100999999|
|---|---|---|---|---|
|2018-05-10T14:07:13.391+0200|INFO|[test]|beater/test.go:99|closing beat.Client|
|2018-05-10T14:07:13.408+0200|INFO|[test]|beater/test.go:46|==> Last Published event IDs is ACKed: 100999423|
|2018-05-10T14:07:14.401+0200|INFO|[test]|beater/test.go:46|==> Last Published event IDs is ACKed: 100999999|
|2018-05-10T14:07:14.401+0200|INFO|[test]|beater/test.go:101|closed beat.Client|

What's interesting about this output is the 'delay'. Checking the timestamps, there is a noticeable delay between the last 2 ACKs of about 1 second. The memory queue buffers up to 2048 events, before flushing the buffer and forwarding the batch to the outputs. With the given configuration, the last batch size is 500 something events. That is, we have to wait for the timeout.
Btw. one can set timeout: 0s, in order to stream much smaller batches directly to the outputs.

Changing my queue setting in the config file to this:

queue.mem:
  flush.timeout: 10s

Then ACKs have been indeed been missing:

|2018-05-10T14:06:45.511+0200|INFO|[test]|beater/test.go:99|closing beat.Client|
|---|---|---|---|---|
|2018-05-10T14:06:45.528+0200|INFO|[test]|beater/test.go:46|==> Last Published event IDs is ACKed: 100999423|
|2018-05-10T14:06:48.513+0200|INFO|[test]|beater/test.go:101|closed beat.Client|

As the queue batching up events is shared between all beat.Client instances, the queue can't be flushed if one beat.Client is closing. That is, it's wanted behavior. The global ACK handling and WaitClose functionality (which is not exposed yet), should not suffer from this problem.

My recommendation for now would be:

  • Use local WaitClose and set WaitClose as big as possible. E.g. systemd waits (it's configurable) up to 30s before forcing a shutdown. In this case I would set WaitClose to 25s or even 28s. This also helps in case you output is blocking for an short amount of time.
  • Wait for registry file and global WaitClose becomes available via libbeat. Still, we are working with timeouts, so there is always some races between networks and internal pipelines going on.
  • Set an id field via Meta. This will use your custom ID as document ID when indexing. The Elasticsearch output uses create when indexing events with custom id. Events with ID already being indexed will be rejected by Elasticsearch, getting you some deduplication behavior. This is also helpful in case of temporary network errors, as the beat will have to retry sending events, potentially generating duplicates if no id is present.

You sample code already creates an id. Also ensure this id is very unique. When reading from a database, the id could be made out of the database source name + table name + row-id. E.g. <hash of db url + table name> - <row id>.


(Steffen Siering) #17

I also wonder about your use-case.

Normally beats are run as service/daemon, collecting data all the time. But as you describe it, it sounds like you want to run the beat once (potentially with restart from last state), in order to transfer some data at will. But expect the beat to shutdown once it's finished (cron job?). For this later use-case you will need to introduce some bookkeeping yourself in the ACK handler and shutdown only once all events have been ACKed. In this use case, the WaitClose adds some additional timeout, giving the beat a chance to finish publishing some scheduled events in case the beat is stopped before being finished. The "run once" use case is not really supported by libbeat, but could be added relatively easily I think by enforcing a block without timeout until all events are published. The problem is, you never know how long a run will take, as network and even indexing in Elasticsearch can create quite some back-pressure or extend overall runtime if not being available (network outage).


(Ecc256) #18

How do you do it exactly?
It’s not done via test.yml, is it?

Could you link an example or modify test beat code to show such case, please?

Yep, that’s exactly why I set id if first place.

Could you clarify this a bit, please?
I assume an event with same ID will be rejected if the same exactly event already exists in Elasticsearch (i.e. all fields are same, not just ID).
The event will be updated otherwise.
Am I wrong about it?

Yep. This is what this thread all about.
Thanks to your help, it works in "schedule/run once" mode just fine already! :smile:


(Ecc256) #19

Just tested by posting events with different @timestamp.
If new and old events are in same index - new one is rejected.
In not – it’ll be 2 events with same _id in different indexes.
Does it mean, to update an event with the _id, we need to find old event and delete it first if they go into same index?
Is there an easier way to do it?


(Steffen Siering) #20

Sorry for answering late, I just returned from vacations.

Well, I was publishing to /dev/null, so the output was never really creating back-pressure.

How do you do it exactly?

Just use console output and start the beat with <beatname> > /dev/null. Logs and errors are written to stderr, while events are written to stdout.

As the queue batching up events is shared between all beat.Client instances, the queue can't be flushed if one beat.Client is closing.

Could you link an example or modify test beat code to show such case, please?

What kind of code you mean? The queue can not be flushed. Normally each go-routine wanting to publish events should have it's own beat.Client. E.g. in your use-case consider multiple data collectors, each collecting documents/events using a different SQL query or even a different database. A very artificial example:


type Runner interface {
  Run()
  Stop()
}

func startCollectors(pipeline beat.Pipeline, collectorFactories []factory) {
    for factory, _ := range collectorFactories {
      client, err := pipeline.ConnectWith(...)
      if err != nil { panic("ups") }

      runner, err := factory(client)
      if err != nil { panic("ups") }

      go runner.Run()
    }
}

Events with ID already being indexed will be rejected by Elasticsearch, getting you some de-duplication behavior.

Could you clarify this a bit, please?
I assume an event with same ID will be rejected if the same exactly event already exists in Elasticsearch (i.e. all fields are same, not just ID).
The event will be updated otherwise.
Am I wrong about it?

When publishing events to Elasticsearch (bulk API or per document index API), an Operation Type is configured. If the id is configured, the op_type is set to create. Otherwise the op_type is index by default. The index type would overwrite your document. This means your old document will be marked as deleted (no actual space might be freed yet) and a new document with the same _id will be indexed. The create type, used if id is set will has an "put-if-absent"-semantics. As the documents ID, but not the contents identifies and document, any document with an already existing ID will be rejected as already present.

The Elasticsearch output in Logstash supports a configurable action type. See docs.

The use-case for introducing id in Meta is about deduplication. That why the create op_type is used. E.g. on network errors an event might be indexed multiple times due to retries (if HTTP response get's lost after operation completed).

I guess you want to be able to overwrite documents/events already being indexed? For this to work we would have to introduce an index action type to Meta. But always using index can create additional indexing workload in presence of errors.