Hi friends, new to ES and need some help from y'all. How can I handle out of order updates? Some context: We're building a custom monitoring system and planning to send events to ES via Kafka. Producers > Kafka > Consumer > ES. Basically we are trying to keep track of two things.
event log of an entity that could have field1, field2....fieldN. There could be multiple events for this entity with only a few fields populated (since they come from different sources). e.g event1 can have field1,2,3 populated; event2 can have field3,4,5 populated. (First Index name: History)
Keep consolidated/most up to date status from these events. So basically a row/document per entity that has all these fields consolidated/merged from multiple events. e.g field1,2,3,4,5. In this case, we have field3 (which is common among both events) to be updated by the most recent event (whichever came later). (Second index name: Current)
So we basically want to make sure the ordering of events can sustained and updates apply properly to "Current" index. How can we accomplish this at field level (so each field updates abide by this). One thing we thought was have "last_updated" timestamp for each field and check that timestamp before updating that field. But that could be expensive since we are reading before writing/updating every time. We also read about versioning, but that applies at the document level not at field level if I understood correctly. Any suggestions what we could do about this?
Also, does this use case justify using Elasticsearch?
The traditional approach to this would be to store each event as time based data, then you can use aggregations - like top hits, to get the latest events by timestamp - to grab summarised data.
Your need to consolidate does make it a little more difficult, but you could look at a rollup or transform for that, depending on specific requirements.
@warkolm Thank you for responding. Heads up, I'm new to ES
Can you please elaborate a little bit on this
I assume you are saying using timestamp as the id? My thought was querying this index based on the entity id (which is a field in our document;type-keyword) and that should return all documents that match it. _id will be timestamp. But I did not get the aggregation, top hits and summarised data part? Why do we need aggregations and summarised data here?
Do you think, use case 2, "Current" index, where we keep updating the same doc when new events are ingested (by comparing time stamp to avoid processing old events) something considered not optimal? It's basically reading the timestamp for each field in the document before updating it.
EDIT: What does ES recommend we do for something like case 2 (aggregation) with maintain ordering of updates. Read before writes can be expensive though.
Have a timestamp in there, but also have your entity ID that you can search on. The entity ID is unique, and then the event time is (usually) unique, but an entity itself can create multiple events.
In that way, it's not a lot of conceptual different from a log event from a host, right?
This is where aggregations can help, as you may not need to build another index if you can build an agg query that can given you the results you want without having to index again.
You still can if you want, and that's where rollups or transforms might be able to do that.
This is where you get the It Depends answer, as there's a few paths to the solution, and it's really one based on what you're comfy taking based on your needs.
Let me have a think about a solution here and get back to you. Someone else might also have ideas they can share
To give a little more context, basically, we are emitting these events from different systems and each system emits different fields (some maybe common) but some will be different. e.g.
System 1 emits field: time, host ID, health status
System 2 emits field: time, host ID, host role, group ID
System 3 emits field: time, host ID, target OS version
What we want is where we can merge/consolidate all these fields like: time, host ID, health status, host role, group ID, target OS version
like a single row in SQL, so it is easy to query
This is needed as we want to display all fields of this host on the UI properly. (not sure if they should be aggregated/consolidated at write time or query time?)
Importantly, we need to perform filtering operations as well. The UI will have an option like, shows all hosts (with all fields) with role abc. Or show us all hosts with target OS version xyz. There might be many other fields which can be used to filter
So whenever you update a document, you simply insert individual field with timestamp. Ensure the global "Time" doesn't get overwritten with older timestamp.
We basically keep a history of all values within individual field.
Just ensure the list is bounded to say < 30 by deleting oldest timestamp first.
That way, your global "Time" tells you the latest document update time. Individual field tells you when that field was updated to the "value", etc.
You need to massage the query result a little bit by returning only the latest timestamp values for each field. This little overhead is tolerable for us since we always massage the query result before returning to the caller (api layer called from GUI, etc). Or just return the entire document and let the caller handle it (java script if web UI, etc)
Why is this needed? For context, we need a merged document with all fields. The global time does not matter to us since we are querying by the host_ID. Each field time matters so we don't overwrite new value with old value. Or am I misunderstanding?
Is this done by elasticsearch? or a background job in your service?
If I understand correctly, you are managing the out of order case by keep the history for each field and fetching the latest one at run time? Means faster writes, since it is only append, but slower reads?
global time is a way to know when the latest document update is. If you don't care, then ignore/remove it. We often sort documents based on most recently updated. A global timestamp helps with such query. If you don't, you need to know which field was updated to be able to perform such query.
The writer is the one that limits the size for us. Merging by definition is read -> modify -> write, so your code should be able to just remove the oldest entry after merging. What we have done is to ensure sorted order in the list base on timestamp. All we need to do is to remove the end document when size limit is exceeded.
*If you don't limit the size, it'll eventually make indexing time longer unnecessarily.
Yes. Timestamp for each field removes the out of order issue. Plus we insert into ordered list.
It might seem the time is wasted on the read, but it's actually not. Database schema can't be changed easily; therefore, most DB schema will not match exactly what the UI needs. An API layer will always be introduced to perform document translation before returning a new payload to UI.
There's not much extra time during the read phase from our experience.
The reason we keep the fields in a sorted list is the UI could simply read the first/last (depending on the sorting order) entry of each field to get the latest. So it'll also work without the API layer. In such scenario, no extra time for read. It might just take longer for the UI to display. (We do have both scenarios where some are translated and some are raw DB payloads).
There are many reasons we ended up with such scheme. With a history of every field, we could know the exact duration of the value, say IP of a client. We can tell a device has this IP from t1 -> t2, etc.
You might need to experiment to see if it fits your use case. With DB, less is more.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.