Hot/warm data idempotency and vertical scaled ES infrastructure using Curator

I have a hot/warm implementation which is orchestrated with Curator and it works really well. However, I have a thought, a design-related thought, which has been bogging me for a while and I'd like to discuss this with you.

Like many others, I have a stream of data which is indexed into the hot instance(s). This data is "curated" using Curator over to the warm instances when a couple of TBs have been indexed. We must be 100% sure that data is idempotently indexed in ES while being transferred from hot instances to warm - in other words we don't accept duplicated data. As we WILL run into situations where already indexed data will re-appear on the hot instance(s), how can we make sure that the re-allocated index' data is "upserted" on the warm instances when they're rolled over and, shrinked et c?

One more thing Curator related. When re-allocating to the warm nodes - is it possible to specify instances instead of physical nodes? We have, for various reasons, decided to scale vertically meaning we have a number of ES-instances on a big machine, and I'd like to be able to specify localhost:portx, localhost:porty, localhost:portz for the hosts while it is performing the deterministic algorithm on the warm instances to determine which instance to use. Maybe that's possible already?

I hope I have stated my "case" clearly enough - if not please don't hesitate to ask me to clarify things :slight_smile:

Many thanks in advance!

I would recommend you read this blog post about duplicate prevention.

Hi Christian!

And thank you - your link was/is spot on! Based on the information provided I'm thinking on a process flow something similar to (in pseudocode):

  1. "Initiate a rollover on the index currently pointed to by alias 'x' based on index size >= 1.4TB"
  2. "Determine the index that was 'rolled over'"
  3. "Initiate a scroll based on, say, a '2 000 000 docs window'"
  4. "Get batch of '2 000 000 docs'"
  5. "Prepare Upserts on 'warm' docs based on doc id (hex encoded timestamp + md5 based content hash)"
  6. "Perform bulk operations"
  7. "Repeat steps 4-6 until no more docs"
  8. "Force merge upserted index"
  9. "Delete rolled over index (residing on hot)"

The process would be realized using the Java APIs.

What do you say - does it make sense?

The whole point of having a hot/warm architecture is to do all the I/O intensive indexing on the hot nodes that have much better disks. Here it sounds like you will be indexing data twice, which will result in a lot of overhead. While you are reindexing the data you also run the risk of seeing duplicate data in your searches.

If duplicate prevention is important, I would probably recommend staying with time-based indices instead of rollover. What is the point in having the rolled over indices being a specific size if you are soon going to delete them anyway?

Unfortunately we have a very "skewed" setup of disks between 2TB of SSDs and 64TB spinning disks. We WILL index all incoming data on the "hot" SSDs but we will be able to store a relatively small amount of data before sending it over to the "warm" disks with roughly 32 times more storage. It is critical that a doc NEVER shows up more than once, whether it resides in "hot" or "warm". We have 0 tolerance to duplicates due to use cases that we're looking at. We will repeatedly experience a lot of "out of bounds data" making it really hard to put them in time based indexes - the data might show up weeks (months?) after it was created...

How large portion of the total data set is expected to show up late (after the data has been migrated to warm nodes)? Is this likely to be duplicate data or mostly new data?

Unfortunately it is really hard to say - from our point of view we MUST be 100% certain data is idempotent, anything else is a "failure"! I'd love to say that "most of the data will be new" but I simply can't... sorry... I KNOW we will run inte situations where data from certain sources will be re transferred without our knowledge it happened/happens.

How about something like this then?

Let us assume you keep X days worth of data on the hot nodes. In your ingest pipeline you check whether a document is within this time-period or not. If it is, go ahead and index it directly into time-based indices using e.g. a hash-based ID. If it is "old" then temporarily queue it up outside Elasticsearch and keep track of which indices that are going to be written to. This assumes that data coming in late does not need to be indexed immediately.

Then have a periodic process that moves an old index from warm to hot and indexes all the related data that has been queued up before again force merging it and moving it back to cold.

This ensures there are no duplicates and should be easier to automate. All indexing also takes place on the hot nodes.

If the portion of late documents is small, it may be fine to just let them be indexed on the warm nodes.

Hmmmmm, let me think about this for a while. BTW, are you situated in Sweden? Just asking as this would open up possibilities to meet up and discuss this in more detail. Although, up to this point, I've been able to be very generic but if we need to get into more detail we need to discuss this "elsewhere"!

As I said, I'll look into your suggestion and let you know what I come up with based on the info. Thank you so much for the input!

I am Swedish, but based in the UK.

Ok - I'll contact you in the near future. Tack så mycket :slight_smile:

Just a quick questions...

How does this cope with situations with "temporal skewed data inflow" using time-based indexes? This is the reason we have opted for a size-based rollover process in the first place. Some indexes will have a relatively small number of shards while others have a relatively large number of shards....

It does not, so it is likely that shard sizes may vary depending on how much data was received. How much does your data flow fluctuate? Is having indices of different sizes a problem?

You can adjust shard count through the shrink and split APIs if size is extreme in either way, but that also comes at a bit of cost with respect to complexity.

Ok - I'll think about this a little bit more...

If I initiate a "bulk upsert" using the BulkProcessor with x number of records (the upserts will be based on the doc ids), will the WriteResponse contain the docid and action (insert | upsert) for each document?

The order of the responses will be the same as the order of the requests. What is it you are trying to do?

We're discussing different options. So the response will contain the actual action outcome for each upsert: INSERT or UPDATE for each document. The response order will give me the actual id unless it is included in the actual response. That's fine with me if that's the actual outcome - it's makes it possible to correlate id with action.

Yes, you can correlate request and response that way.

Ok. Thanks.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.