How to deal with building huge bulk load indices fast without impacting prod queries or paying a fortune to over-provision the cluster

I sent the following slack to a colleague today. He does elasticsearch consulting and usually keeps pretty current on all things elasticsearch. Then I thought maybe other people would have ideas too. Here it is:

Man, every possible solution for getting bulk data into elasticsearch seems kludgy or has some drawbacks. I've thought of 6 different ways of doing it, but none of them really seems good. Or I haven't found the right documentation for a solution.

  • Option 1: Fire up a standalone elasticsearch instance on each pipeline node and build an index in it containing all of the documents from that node, but use the number of shards that the final index in production will use and specify routing that final index will use. Snapshot each standalone index. Load into production index from all of the snapshots doing a lucene index merge per shard since each standalone index will have the same number of shards and routing. Rebuild cluster metadata.

  • Problems: Elasticsearch simply doesn't have this feature I think, but it should. Unfortunately I don't have enough knowledge of the inner workings of elasticsearch to implement this underneath with lucene merge index like solr.

  • Option 2: Fire up an elasticsearch cluster on the pipeline nodes with each node running an elasticsearch member. Set custom shard placement so that shard 0 goes to node 0, shard 1 goes to node 1 etc. with the same number of shards as pipeline nodes. Set custom routing to match pipeline partitioning so that all documents on node 0 go to shard 0 etc. Snapshot the index and load it as normal.

  • Problems: Elasticsearch doesn't seem to support this level of detail for routing. You can specify which parameter to route on but not how exactly that parameter matches to a shard. It also doesn't seem to support this level of shard placement config. You can specify which nodes shards for a given index will be placed on but you can't specify exactly which shard (with which routing) must be placed on which node. This solution forces you into a certain number of shards that might not be optimal for the production cluster.

  • Option 3: Fire up an indexing machine for each pipeline node to join the production elasticsearch cluster while indexing is happening. Somehow use placement or something to send all shards for indexing work to the new indexing machines, and to stop the new machines from taking any query traffic. Increase replicas after indexing and use placement to make all the replicas go to the query nodes. Shut down the indexing nodes and hope one of the replicas on the query nodes becomes the primary appropriately.

  • Problems: We need to pay for a bunch of extra nodes and we have to send everything over network instead of all indexing going to the local machine. Not sure how much this type of thing affects the production cluster. It seems risky poking at the prod cluster like this and trying to get all the placement configuration and stuff just right. It's not clear exactly how much load it will place on the rest of the cluster when all the rebalancing happens and how much control we have over that.

  • Option 4: Fire up a complete separate elasticsearch cluster similar to option 3, but don't join the production cluster. Build the index on this cluster and snapshot it. load the snapshot to prod as normal.

  • Problems: We need to pay for a whole elasticsearch cluster, but probably still a better solution than 3 since it doesn't have the rest of the drawbacks. Still seems pretty clunky and have to run way more machines and use way more network copy than an optimal solution would require.

  • Option 5: Switch to using REST client only. Put elasticsearch behind ELB. Run 2 elasticsearch clusters for batch indices. One cluster for indexing. One cluster for querying. Double buffer the clusters. Indexing to back cluster while front cluster takes traffic. After indexing flip the load balancer, and the new back cluster is ready for indexing. (edited)

  • Problems: We need to switch to use REST client only. We need to run 2 clusters. Otherwise this solution is fairly safe and convenient, but we still have the issue of the indexinig solution being subotimal because of the extra network.

  • Option 6: Fire up a standalone elasticsearch instance on each pipeline node and build an index in it containing all of the documents from that node and use only one shard. Snapshot each index. On prod cluster load each index, and use placement to make sure each node gets an appropriate number of indices with appropriate replicas. Do our own query routing in client instead of using elasticsearch routing to make sure we are asking the right index for the right documents, or query all the indices together.

  • Problems: This might be the best solution. We will have a lot of indices, but essentially you could now think of each of our indices as a shard, and think of our aliases as our indices. Our client will need to be more in tune with the pipeline to make sure it is using the same routing. We take the power away from elasticsearch to control routing and placement (I'm not sure this is a bad thing). I'm not sure what implications this has for more complex indices, parent child or whatever other features of elasticsearch this might impact. I don't think we currently use anything complex with any of the indices we would do this with.

What do you think about this issue? Do you know what other people are doing about this right now? Is elasticsearch implementing any features that could help with this at all? Anything in 2.0 that helps with this?

It's odd that other people don’t seem to be talking about this super long indexing time issue for large bulk load indices.

So that's it. I'm interested to hear what other people are doing to solve this issue. Our specific environment is AWS so we have the ability so spawn machines on the fly, and we snapshot to S3. The current indexing task that spawned this train of thought is something in the neighborhood of 250 mil docs and takes roughly 8 hours to build on a 16 core 128G machine. Would be nice to do it on 20 machines under a half hour, without shipping local data over the network to other nodes, and produce an index with arbitrary number of shards. The details of the specific case aren't really that important as I'm more interested in general solutions to the problem.

Is 250mil docs the total data size that you would like to index? and you are trying to index them less than half hour with a reasonable number of machines?

Yes. Without impacting production queries.

Data is growing daily though, and new indices for new features are constantly required. I just checked and it's actually closer to 360 mil docs at the moment. That's why I said the details of the specific case that triggered this train of thought aren't that important because I'm interested in a general solution for horizontal scaling of indexing either in an offline cluster, or on offline machines then combining after, or on machines in the cluster but not taking traffic that can be turned off when not indexing.

I prefer to use the ETL pipeline machines themselves with embedded nodes if possible as the data is already on them and already partitioned nicely. Having more machines is just more money, and sending all that data over the network an extra time should be unnecessary. This is the reason I was favoring Option 6, but even Option 6 doesn't really seem like an awesome solution which is why I am searching for other ideas.

Note that something like either Option 1 or Option 2 is really the optimal solution. I couldn't find anything like the solr index merge tool for elasticsearch though so I think this functionality is just missing, or well hidden. Option 2 I couldn't figure out how to make it work with placement and routing, but maybe it's possible somehow. I'm hoping for a solution more along these lines. Maybe something like Option 2 is possible somehow. These solutions allow indexing directly on the ETL pipeline machines without network copy.

Options 3, 4 and 5 I really just included because I knew my buddy would go there if I didn't indicate I'd already thought of them and what the problems are.

Option 6 is super hackish but it sort of allows an optimal solution in terms of network and hardware on the indexing side. Obviously there are the query time and convenience concessions as a result with that hack though.

A variation of option 3 could be to temporarily add a number of nodes specifically aimed and optimised for indexing to the cluster. Label all existing search nodes differently from all the new indexing nodes and use shard allocation awareness to make sure all existing indices are stored on the search nodes and not relocated to the indexing nodes. Likewise label the indexing nodes and create the new indices being indexed into so that they are all spread across the indexing nodes, most likely with no replicas enabled.

Then set up a client node per pipeline instance and let this efficiently rout each record to the correct instance. As long as you are not using dynamic mappings that can cause frequent updates to the cluster state, I believe this should isolate the indexing nodes from the query serving part of the cluster. Although indexing is not done locally, this approach allows you to create fewer indices with the correct number of shards, and the data need to be transferred anyway in the end. This also allows Elasticsearch to handle routing, which removes the need for potential changes client-side.

Once indexing has completed, change the settings for the newly created indices and let Elasticsearch relocate these to the query nodes before shutting down the indexing and query nodes. There are a number of settings to control how aggressive rebalancing is, which should give you a good level of control. This is as far as I know not very different from the process used when restoring a snapshot.

Yeah we are using no dynamic mappings, at all, anywhere. I should have noted that. Everything is strict.

hmmm ok that's interesting, as restoring snapshots works quite well with us limiting how many resources the cluster uses to load the snapshots. I didn't think of putting a local node on each pipeline machine as a router but that makes sense. I was worried about the rebalancing because we have had issues on node failure before. This might be just that we have not configured rebalancing as tightly as we have configured snapshot loading though. I'll have to look at that.

It might be possible then to have the ETL pipeline machines actually join the prod cluster AS the indexing machines with the cluster setup as you say, with tagging, so that indexing goes to these machines and not to the query machines. They would then route to each other over the network, but they wouldn't have to snapshot and load from S3 so it should net out to the same amount of data going over the wire in the end, and if the rebalancing can be configured to be as gentle as a snapshot load this could work well.

One drawback is we have more pipeline machines than query machines so we will end up with a weird number of shards, or we will have to size up the pipeline machines to do the work on fewer nodes to get a better number of shards, or we could simply leave a few of the pipeline machines without shards during the indexing phase essentially wasting them. I think having a weird number of shards is probably ok though for the tradeoff of getting to use all of our machines. Hopefully after replicas are all rebalanced it works out to more or less evenly load up the query nodes. We don't do any indexing on the query nodes so which ones receive primaries vs replicas shouldn't matter maybe. We are a long way away from overshard death so it shouldn't be a big problem I guess.

@Adam_Brown Please bear with me as I'm trying to understand what you have, how your data grow, how you want to index data, etc...

  • as your data grows per day, do you re-index all of them? or just the daily set?

  • do you want to index data once a day or continuously through out the day?

  • how fast can your data grow in terms of percentage, starting with 250mil documents?

  • starting point could be one ETL pipeline machine/instance with one-node ES Cluster then grow the cluster as data grows with more ETL pipeline machines?

  • can "document type" be used to identify where data is coming from instead of routing since the data size per day does not seem that big?

This is where I'm heading to... for example, let's say you have 250mil documents that you want to index as fast as possible, at the same time you don't want indexing process affecting searches. I'm assuming you have seen this: if you index 250mil with two shards, in theory, each shard will have about 125mil documents. As the data grows, ES cluster needs to be expanded but continue to maintain the same performance for indexing and searching. Am I on the right track with what you are trying to accomplish?

We index data per day, per hour and continuously, but this topic is about the the per day stuff. It's actually closer to 360 mil not 250 now. New data that comes in changes matches on older data so the pipeline needs to run on all the data again. We also upgrade our algorithms for data processing over time and this requires rebuilding the whole thing. It's easiest just to do the whole thing daily for some data.

Hard to predict how the data will grow. We keep getting asked for more data, getting new clients, improving algorithms that require more data. We need to be able to handle it all.

We are far past the point of one pipeline or query machine being sufficient. Some production things generate like 800+ queries per second already, and the ETL pipeline already uses 16 machines. The bottleneck in my example is the one indexing machine that takes 8 hours to finish and that's the piece of this I'm working on fixing. Indexing needs to get distributed like the rest of the pipeline. That specific example was just to give some context.

Not sure what you mean by using doc type for routing. Certain indices are a single doc type. Some indices are multiple doc types depending on the usecase. We certainly do not want all docs of one type to end up on a single shard. That would be bad.

Let's address the performance of indexing 360mil docs issue you have first to see if this will help.

If you use ES Java API to index data, there is a class called BulkProcessor that you can use to index data in bulk. It has 4 parameters: bulk size, bulk actions, flush interval, and concurrent requests, that you can play to see what works and what does not work with your data and environment.

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html

It also allows you to put in a BulkProcessor.Listener in which you can display the submitted bulk actions, bulk size, and the time it takes to process (or index in the case) the number of documents as stated by the bulk actions.

By doing this, hopefully you can address the issue that you have.

Another thing that you may want to check if you have not done so is the HEAP_SIZE vs system RAM and the number of processors. In theory, the HEAP_SIZE is good upto 32GB (I think that's what the document says, I normally set it to 30GB) but you need to reserve at least 50% of system RAM for Lucene for indexing. In your case, with a 128GB RAM, you should have no problem of using 30GB HEAP_SIZE. The number of processors can be specified in ES configuration file, elasticsearch.yml. This setting will let ES behave better, and you won't see the mysterious error about running out of file descriptors or something similar from ES. The reason I mention this attribute is because it looks like you have other things running in the same machine, not just ES instance.

If you run a process in this machine to index the data, you'll need to reserve some memory for it too and also it depends on the size of the document and what you do with it before indexing it, it should have adequate resources to do so.

After trying these with the existing HW and resources, and you are still not able to achieve the performance you would like to, I think you'll need to split the process up where the ES cluster needs to be outside of this node and the cluster needs to have more than one node.