Entity-Centric Indexing - reliability and performance

I get the general principle of Mark Harwood's entity-centric indexing pattern, and I've written some code, and it works.

The problem I've got is around the "periodic extracts sorted by entity ID and time" - how do you do this so that you (a) don't miss any data and (b) don't burn resources processing the same data over and over again?

Scenario: log entries get written to a log file, picked up by Filebeat, parsed by Logstash, stored in Elasticsearch. The there's a Python script (loosely based on Mark's example code) to search for relevant new log records and create/update the entity centric documents.

Fine, but, how to search for "new" log records? - at present I'm running the script once a minute, because it seems likely to be a reasonable maximum time for our Ops people to have to wait before being told something is broken, and the script is looking at all documents less than an hour old, based on @timestamp, because I've seen a Filebeat instance getting an hour behind with its processing before now.

The problems with this are obvious:

  • if a Filebeat instance were to get more than an hour behind there would be some records that the script would never look at
  • because I'm scanning an hour's worth of log documents every minute I'm looking at each relevant one 60 times, which means doing 60 times as much work as necessary.

I can't see any way to (a) guarantee that all new log documents do get looked at by the script and (b) not waste resources repeatedly processing the same data.

One thing that has occurred to me is to stop using @timestamp and instead use an ingest pipeline to generate an ingest timestamp and use that. I can then guess that it will almost never take more than a minute from the ingest timestamp until the document is actually indexed, and make my script look back two minutes based on the ingest timestamp instead of 60 minutes based on @timestamp.

This would, I think, reduce the chances of missing data, but not to zero, and reduce the amount of duplicated wasted work, but not to zero.

What have I missed? Or is that really the best that can be done?

Why not get your python script to store the last timestamp it saw at the end of each run and then it can base its time range lower bound on that value the next time it runs? If you are worried about delayed log records appearing after you've already gone pasted the timestamp then this approach combined with the ingest_time field you suggested should be sufficient?

Date of ingest will be the date to use for synching rather than the date of the log record which could arrive out of sequence..
A basic approach might be if you store a "last_seen_event" date on entities in your entity index then your query on your event store would be for all events since max last_seen_event on your entity store, sorted by entityID and time.
I expect there are update failure scenarios where this could lead to some dropped events so the 2 mitigating strategies might be:

  1. Have a "ragged edge" e.g. where query on events store is potentially personalised for each entity e.g. all events for entity A since A1 and all events for B since B1 ....etc
  2. Have a pessimistic baseline - a single time for the last run which either marks the start of the run or the completion time for the whole run once you've 100% guaranteed all latest events are applied.

Both of these have pros and cons. 1) would potentially create some horrible/inefficient queries and 2) would require re-playability of certain events (update scripts would need to be idempotent)
In both cases the query on the event store would have to include both a "from" and a "to" of less than "now" to ensure stability of the set of event docs being applied.

This BTW is a notion rather than based on practical experience of building such a thing.

Because data is coming in from more than one server via more than one instance of Filebeat and there's no guarantee that they're synchronised. So a record timestamped 10:30 can arrive from server 1, and some time later a relevant record from server 2 can arrive timestamped 10:25, which I would miss if I were then only looking for records after 10:30.

Ta - will study when I'm not in the middle of something else.

I'd already sussed that my code would probably have to be idempotent, and have written and tested it accordingly.

1 Like

You could potentially add a field containing the index timestamp through an Ingest Script processor. This would allow Elasticsearch to set the timestamp just before indexing the document and should be more accurate than setting it at an earlier stage as there may be retries delaying the event reading Elasticsearch. Once you have such a timestamp, (assuming clocks are synchronised across the cluster), you could fetch data based on this field and be reasonably sure (??) to not miss events or processing them twice as long as you lag a a bit behind to account for refresh interval and the time refresh processing can take.

I'm not sure that what I suggest fits your business or technological needs, but I suggest looking for a Store-First architecture. Maybe you can save the event documents before the processing and indexing in elasticsearch (just as they arrive) in other external middleware (such as Kafka or RabbitMQ). This could give you the ability to wake up a procedure (some kind of script / application) every fix or dynamic interval and pick up what you missed since your last run. This way you will avoid most of the read queries nor scroll requests to elasticsearch for identifying the latest event document. In addition, this way might help in processing the events for the entity indices using multiple scripts instead if one, that process the Store-First queue in parallel for much higher throughput. If you want to use only the ELK stack itself, maybe you can just store first the data in a separate index other than the event and entity indices (although I believe it's odd, since elasticsearch isn't considered a database or a queue, but a search engine).

Thanks - I'll bear that in mind for future reference. Kafka is likely to feature in our future, but it's not part of our current system and it would be OTT to add it just for this purpose at present.

Elasticsearch scroll requests on the event store will do several things of value for you:

  1. Efficiently query only the docs since a given time (earlier comments here seemed to assume you'd have to scan all docs)
  2. Provide a stable point-in-time view of the data for the duration of a scroll (ie not endlessly chasing new content)
  3. Sort the docs by entity_id, then date (important for making a single consolidated update to each entity rather than one per event).

I must have been misunderstanding "scroll" and the Python library then. I'm using helpers.scan with preserve_order true, because I thought I needed preserve_order true so as to get the results in the order I want, but preserve_order true "negates the performance benefits of using scan"?

My original example script was reviewed and improved by the author of the elasticsearch Python client library.

I've just tested it again sorting 27m tweets by author ID and date and it looks to be working here.

So how do I create the ingest timestamp then? As per documentation I'd expect

PUT _ingest/pipeline/ingest_timestamp
{
  "description" : "Pipeline which adds an ingest timestamp to each document",
  "processors" : [
    {
      "set" : {
        "field": "ingested",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

with the mapping for the field set as

"ingested": { "type": "date" },

but that gets me

"error"=>
{
	"type"=>"mapper_parsing_exception",
	"reason"=>"failed to parse [ingested]",
	"caused_by"=>
    {
		"type"=>"illegal_argument_exception",
		"reason"=>"Invalid format: \"Tue Jul 18 16:00:03 BST 2017\""
    }
}

???

I found this issue which looks to be related: https://github.com/elastic/elasticsearch/issues/23168

Yes, I found that, but what it doesn't appear to have is any answer/workaround. If you search further for "ingest.new_date_format" you find that it was later deprecated and then removed. And this wasn't because the original problem was fixed, because the original problem is still there.

I sincerely hope this isn't how it's done:

Yeah, I was going to try a Painless script next. One might hope that it allows access to the new Java 8 date-munging classes so won't need quite so much gibberish. TBC.

This is what I'm using elsewhere, in Java:

ZonedDateTime.now().format( DateTimeFormatter.ISO_INSTANT )

... which doesn't work, because now() isn't allowed, but the version you pointed at, nasty though it is, does work, because Date() is allowed, despite doing essentially the same thing as now().

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.