Libbeat does not allow updates when publishing to Elasticsearch?


(Steven T.) #1

NOTE: After doing additional research and managing to find the code preventing updates, I've revised this post to be more accurate and succinct.

ISSUE: When using the beat.Client to publish [bulk] beat.Events to the Elasticsearch output, the client does not allow updates by way of including the document id in the Meta map.

Sample code:

	client.Publish(beat.Event{
		Fields: common.MapStr{
			"field1":	"abc",
		},
		Timestamp: time.Now(),
		Meta: common.MapStr{
			"id":    "123",
		},
	})

The first time the document is published, it is successfully created. The second time the document is published (with updated values), an error is generated indicating "version conflict, document already exists".

I've tracked it down to this code in the libbeat/outputs/elasticsearch/client.go

From version 6.4 of github.com/elastic/beats/libbeat/outputs/elasticsearch/client.go

417     if id != "" {
418    	  return bulkCreateAction{meta}, nil
419    	}
420      return bulkIndexAction{meta}, nil

If an id is provided, the client always sends a 'create' action. If I comment out lines 417-419, I'm able to do document updates by including the document id in the Meta map and letting the bulk API decide whether to create the document or update it in the 'index' action as it does using the rest calls below.

POST _bulk
{ "index" : { "_index" : "my-test-index", "_type" : "doc", "_id" : "123" } }
{ "field1": "abc" }

{
  "took": 1650,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "my-test-index",
        "_type": "doc",
        "_id": "123",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 2,
          "failed": 0
        },
        "_seq_no": 0,
        "_primary_term": 1,
        "status": 201
      }
    }
  ]
}


POST _bulk
{ "index" : { "_index" : "my-test-index", "_type" : "doc", "_id" : "123" } }
{ "field1": "xyz" }

{
  "took": 76,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "my-test-index",
        "_type": "doc",
        "_id": "123",
        "_version": 2,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 2,
          "failed": 0
        },
        "_seq_no": 1,
        "_primary_term": 1,
        "status": 200
      }
    }
  ]
}

My question is, is this a bug or a feature?

Is it a feature, meaning was there some rationale behind the client preventing [bulk] updates such as the order in which updates to the same document would be applied could not be guaranteed [to be the same as the order in which they were published]? Or is it a bug, possibly due to old code that made sense based on the supported features of previous versions?

I'd like to be able to take advantage of the Bulk API's ability to do upserts via the 'index' action using the beat.Client, but would like to understand if it was prevented for a still-valid reason prior to submitting an Issue against elastic/beats.

Thanks.


(ruflin) #2

So far Beats is focused on time series which means each event is only written once and then not updated anymore. So it could be said it's by design or the reason we never hit this issue.

Can you share a bit more on the Beat you are writing and why you need to overwrite documents?


(Steven T.) #3

Thank you for your response.

Basically I have two use cases which require the data. The first, which has a higher priority need to access the data, only ever needs the latest event values. The second, which is more in line with typical time series use cases, queries across time ranges and all events are needed.

Indices can have anywhere from several hundred thousand events to roughly a hundred million per day. However, the largest index only has a couple thousand distinct events, but will have fifty to a hundred million events for the day.

Querying the indices as part of the first use case needs to be very efficient (sub second). Using time sorted indices, some of the queries, all of which typically have 1 nested aggregation (2 levels, one to aggregate by the uniqueness criteria, and the other to get the latest hit with the few fields needed), were taking 9-10 seconds or more at times. Moving to a dual index model, one index having all events for the day, and the other using updates to only keep the latest version of the event, the same queries execute against the "latest event" index in several milliseconds.

I'm aware I could do something similar in Logstash, taking a single record and publishing it to the 2 different indices. However, that also requires introducing another moving part in to the architecture, including another hop between the beat and Elasticsearch, as well as another component of the system requiring resources. Simply allowing the updates from libbeat eliminates the need to introduce Logstash to the architecture (granted, I may still need Logstash if there's no guarantee of indexing order of the bulk events).


(ruflin) #4

Basically what you would need from libbeat is that in case you define the id on your end that we overwrite and document with a new version instead of returning an error. Could you open a feature request for this on Github? I'm pretty sure there is more to it on our end but having a Github issue will also bring visibility to it for other engineers.


(Steven T.) #5

Thank you. I have submitted a feature request per your suggestion.

https://github.com/elastic/beats/issues/8534

Regards.

-Steve


(system) #6

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.