Too many slices lead to bloat of thread pool queue and degradation of performance

Hello dear sirs,

We are using .esJsonRDD() to copy an index from an Elasticsearch cluster to Spark cluster, and lately we started to observe that this copy is taking 10-14 hours instead of 2-3. Here's what is going on.

Cluster and index setup

We have an Elasticsearch 6.8 cluster on AWS EC2 with 3 machines, 2 cores/14 Gb RAM each (i3.large).
Elasticsearch is configured to use 7Gb of memory.

The index that we copy contains 54 million documents, which correspond to 8 shards ~20 Gb each.

Performance metrics

CPU of one of the nodes is through the roof:

So is the Load Average.

Also we notice that thread pool queue is at size 60 and stays like this:

And search query latency is also very big on that instance:

Logs

We enabled logging of slow queries, and here's what we saw:

[2019-07-10T14:44:50,844][WARN ][index.search.slowlog.query] [AT1mr_C] [index-XXX][1] took[1.3s], took_millis[1327], total_hits[100044], types[person], stats[], search_type[QUERY_THEN_FETCH], total_shards[1], source[{"size":50,"query":{"match_all":{"boost":1.0}},"sort":[{"_doc":{"order":"asc"}}],"slice":{"field":"_id","id":63,"max":68}}], id[],
[2019-07-10T14:44:51,909][WARN ][index.search.slowlog.query] [AT1mr_C] [index-XXX][1] took[1.5s], took_millis[1570], total_hits[100006], types[person], stats[], search_type[QUERY_THEN_FETCH], total_shards[1], source[{"size":50,"query":{"match_all":{"boost":1.0}},"sort":[{"_doc":{"order":"asc"}}],"slice":{"field":"_id","id":32,"max":68}}], id[],
[2019-07-10T14:44:52,468][WARN ][index.search.slowlog.query] [AT1mr_C] [index-XXX][1] took[1.5s], took_millis[1539], total_hits[100337], types[person], stats[], search_type[QUERY_THEN_FETCH], total_shards[1], source[{"size":50,"query":{"match_all":{"boost":1.0}},"sort":[{"_doc":{"order":"asc"}}],"slice":{"field":"_id","id":59,"max":68}}], id[],
[2019-07-10T14:44:52,526][WARN ][index.search.slowlog.query] [AT1mr_C] [index-XXX][4] took[1s], took_millis[1034], total_hits[100538], types[person], stats[], search_type[QUERY_THEN_FETCH], total_shards[1], source[{"size":50,"query":{"match_all":{"boost":1.0}},"sort":[{"_doc":{"order":"asc"}}],"slice":{"field":"_id","id":11,"max":68}}], id[],

Basically, there are 68 parallel scrolls. Knowing that the thread pool size is 60 it is safe to say that most of the time those slices are waiting, not improving the speed of reading.

(I should say that we actually have 2 such indexes and do 2 esJsonRDD() in parallel.)

What I have seen from my own experiments with Elasticsearch performance under load, when thread pool becomes of non-zero size performance of that node degrades much faster, like in this case, leaving an impression that management of the thread pool consumes a lof of CPU and requires a lot of context switching (hence high load average).

An obvious fix would be to set the amount of parallel scroll requests (slices) so the throughput of a single node is enough.

I couldn't find any way to configure this slice number. The closest thing seems to be the es.input.max.docs.per.partition but I am not sure if it is going to work.

My question is:

How can we prevent Elasticsearch from getting clogged with too many parallel scroll slices, executed by esJsonRDD?

Or better, what would be the most effective way to dump contents of an Elasticsearch index to Spark?

Any advice or suggestion is welcome!

Thank you!

P.S. I was considering to file a bug in elasticsearch-hadoop since this slice-vs-thread pool size thing didn't seem right, but I was also not sure this description will be enough to narrow down the problem, so I decided to create a post here in hope that ES devs will see it and help me reason about the problem.

P.P.S. I have just checked, we use elasticsearch-hadoop=6.6.0 but I think little minor version difference should not impact. We use 8 spark slave machines.

Solution found: balanced cluster + replication

It is so sad to see topics on this forum without any closure, so here is ours, here is what we found to be working for us.

We noticed that we have had 8 shards and 3 nodes, which made cluster unbalanced - some of the nodes had more shards than others. elasticsearch-hadoop's magic seems to presume that cluster is balanced and tries to use its capacities for 100%, which results into one of the nodes being overloaded (as far as I can understand from the doc page about sliced scroll).

Then we noticed that if we change the replication factor from 0 to 1 the load on the cluster becomes palatable, which supported our hypothesis of unbalancedness being a factor.

In the next run we used 4 nodes (with 8 shards) and replication factor 1, and the job finished in 2.5 hours instead of 20 hours. We didn't perform further experiments and decided to keep this solution.

I hope this will help someone with similar problem in the future!

@lonlylocly Thanks for sharing your solution to the issue here, and apologies that it's been quiet on the forums lately. Developers respond to forum posts in their free time, but I'll see if I can set aside some personal time to get some more regular responses.

I'll share a brief bit of context for the sliced scrolls, since it's been one of the more trappy/contentious features added to ES-Hadoop. The feature was added to ES-Hadoop back in 5.0 and was originally meant to improve reading workflows on Spark clusters. Spark has a hard limit on how much data per partition it can process, and in some cases jobs would fail because too much data was being processed from a single ES shard.

Thus, es.input.max.docs.per.partition was added with a default of 100k documents per partition. It was several months later that we noticed such high amounts of issues from users that were overloading ES with high volumes of sliced requests. Now, as of 7.0, the setting's default value has been removed. ES-Hadoop once again defaults to creating a single input partition per shard, with the option to use sliced scrolls if the es.input.max.docs.per.partition is set.

@james.baiera Thank you for the information! Looking forward to migrate to 7.x!