I have a hot/warm implementation which is orchestrated with Curator and it works really well. However, I have a thought, a design-related thought, which has been bogging me for a while and I'd like to discuss this with you.
Like many others, I have a stream of data which is indexed into the hot instance(s). This data is "curated" using Curator over to the warm instances when a couple of TBs have been indexed. We must be 100% sure that data is idempotently indexed in ES while being transferred from hot instances to warm - in other words we don't accept duplicated data. As we WILL run into situations where already indexed data will re-appear on the hot instance(s), how can we make sure that the re-allocated index' data is "upserted" on the warm instances when they're rolled over and, shrinked et c?
One more thing Curator related. When re-allocating to the warm nodes - is it possible to specify instances instead of physical nodes? We have, for various reasons, decided to scale vertically meaning we have a number of ES-instances on a big machine, and I'd like to be able to specify localhost:portx, localhost:porty, localhost:portz for the hosts while it is performing the deterministic algorithm on the warm instances to determine which instance to use. Maybe that's possible already?
I hope I have stated my "case" clearly enough - if not please don't hesitate to ask me to clarify things
The whole point of having a hot/warm architecture is to do all the I/O intensive indexing on the hot nodes that have much better disks. Here it sounds like you will be indexing data twice, which will result in a lot of overhead. While you are reindexing the data you also run the risk of seeing duplicate data in your searches.
If duplicate prevention is important, I would probably recommend staying with time-based indices instead of rollover. What is the point in having the rolled over indices being a specific size if you are soon going to delete them anyway?
Unfortunately we have a very "skewed" setup of disks between 2TB of SSDs and 64TB spinning disks. We WILL index all incoming data on the "hot" SSDs but we will be able to store a relatively small amount of data before sending it over to the "warm" disks with roughly 32 times more storage. It is critical that a doc NEVER shows up more than once, whether it resides in "hot" or "warm". We have 0 tolerance to duplicates due to use cases that we're looking at. We will repeatedly experience a lot of "out of bounds data" making it really hard to put them in time based indexes - the data might show up weeks (months?) after it was created...
How large portion of the total data set is expected to show up late (after the data has been migrated to warm nodes)? Is this likely to be duplicate data or mostly new data?
Unfortunately it is really hard to say - from our point of view we MUST be 100% certain data is idempotent, anything else is a "failure"! I'd love to say that "most of the data will be new" but I simply can't... sorry... I KNOW we will run inte situations where data from certain sources will be re transferred without our knowledge it happened/happens.
Let us assume you keep X days worth of data on the hot nodes. In your ingest pipeline you check whether a document is within this time-period or not. If it is, go ahead and index it directly into time-based indices using e.g. a hash-based ID. If it is "old" then temporarily queue it up outside Elasticsearch and keep track of which indices that are going to be written to. This assumes that data coming in late does not need to be indexed immediately.
Then have a periodic process that moves an old index from warm to hot and indexes all the related data that has been queued up before again force merging it and moving it back to cold.
This ensures there are no duplicates and should be easier to automate. All indexing also takes place on the hot nodes.
If the portion of late documents is small, it may be fine to just let them be indexed on the warm nodes.
Hmmmmm, let me think about this for a while. BTW, are you situated in Sweden? Just asking as this would open up possibilities to meet up and discuss this in more detail. Although, up to this point, I've been able to be very generic but if we need to get into more detail we need to discuss this "elsewhere"!
As I said, I'll look into your suggestion and let you know what I come up with based on the info. Thank you so much for the input!
How does this cope with situations with "temporal skewed data inflow" using time-based indexes? This is the reason we have opted for a size-based rollover process in the first place. Some indexes will have a relatively small number of shards while others have a relatively large number of shards....
It does not, so it is likely that shard sizes may vary depending on how much data was received. How much does your data flow fluctuate? Is having indices of different sizes a problem?
You can adjust shard count through the shrink and split APIs if size is extreme in either way, but that also comes at a bit of cost with respect to complexity.
If I initiate a "bulk upsert" using the BulkProcessor with x number of records (the upserts will be based on the doc ids), will the WriteResponse contain the docid and action (insert | upsert) for each document?
We're discussing different options. So the response will contain the actual action outcome for each upsert: INSERT or UPDATE for each document. The response order will give me the actual id unless it is included in the actual response. That's fine with me if that's the actual outcome - it's makes it possible to correlate id with action.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.