We've been running a Hadoop/Hive (Apache 2.7.0) -> ES (1.5) integration for a while and things have worked reasonable well. One thing we've had to keep in mind when setting up scheduled batch jobs has been not to "overwhelm" the ES-cluster with the data output from Hive, but still, pushing some 80 million documents in one job has not been a problem.. until now.
We upgraded to ES 2.0 (including the latest jars in hadoop) yesterday and several of our batch jobs that use to run fine are now failing. The ones failing all do so for the same reason:
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [x.x.x.x:9200] returned Too Many Requests(429) - rejected execution of org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase$1@3c46fe20 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@593dcd36[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 626670]]; Bailing out..
I've fiddled some with the batch settings such as es.batch.size.bytes, and es.batch.size.entries, and while I do experience some difference in behaviour, the problem definitely does not vanish.
I do know that I could increase the queue on the ES side, but I also know from experience that increasing the queue can lead to other problems (out of heap etc), so I would like I pointer here.. is the way to go to try to throttle the output from hadoop, and if so, how? Or should I focus on ES being able to handle more requests? (please note that there is no rush getting the data in there).
How many tasks do you have in hadoop/Hive and how many ES nodes?
We allow Hive (Beeline really) to automatically assign mappers and reducers based on the data set size. Mappers can be anything from 2-3 to 3000-4000 and reducers can be anything from 0 to 30-40. We have anticipated that the number of tasks could have impact on our problem, meanwhile some jobs with thousands of mappers execute fine, while others with just 8 mappers and zero reducers fail. So far there seems to be a much stronger correlation with the total number of documents generated by the job, rather than by the number of tasks, however, jobs with zero reducers, seem more error prone. Perhaps what's more relevant is the number of tasks running at the same time in the cluster, so a job of 4000 mappers will only have ~70-80 tasks running simultaneously in the hadoop cluster, but again we see this behaviour with jobs with only a couple of mappers and no reducers.
The ES cluster consists of 3 data nodes (8 cores/12GB heap), 3 master nodes and two client nodes.
Zero reducers means there's no shuffling and that mappers write directly to ES (which is typically the case). Of course the total number of documents has a direct impact since this is the number that gets spread across all these tasks. The higher the number, the higher/larger the docs assigned per task.
Do note that the more tasks you have, the higher as well the maximum number of total docs being sent to ES at a certain point.
Basically for 70-80 tasks at once you end up with 20-30 tasks per data node with a total number of docs of (by default) 30K and/or 30MB. At once. Per data node or 80K/80MB per cluster.
Are you monitoring ES? Anything different in 2.0 vs 1.4? Is it CPU or IO bound?
While gathering more detailed data on your questions we found out that since the upgrade all new shards are created solely on one of the data nodes. Not only has this pushed this single data node above 80% disk utilization (even though still all shards are created there..), but I also imagine that's why we have a new bottle neck since this single node has to perform all the indexing...
Why all shards end up on this node still remains a mystery ..
Is there some routing involved? Zone awareness or anything like that?
All this is probably unrelated to the actual topic, but might be of interest anyway. The reason why all shards ended up on one of the data nodes (3 in the table below) was that there in fact was imbalance between the nodes. Something like this:
data-node-1 4000 shards 60% disk utilization
data-node-2 4000 shards 60% disk utilization
data-node-3 3000 shards 85% disk utilization
It seems that the cluster was reluctant to relocate shards since node 3 was over 80% utilization. At the same time, the routing determined that it was the node with the fewest shards, so all new shards ended up on node 3, increasing the utilization even more, more or less causing a runaway-effect. Allowing index creation with 80%+ disk utilization was something we had not seen in ES 1.X.
As soon as we relocated and removed enough data from node 3 so that it got under 80% the cluster immediately started relocating shards to even out all indexes again. All indexes since our upgrade a couple of days ago had all their shards on node 3 before these relocations.
I hoped that all shards being present on one data node was the actual cause for the integration problems we've ran into with the ES 2 upgrade, but even after all shards were relocated and evenly spread across the data nodes, and the cluster was "idling", we more or less had the exact same patterns with the hadoop jobs pushing data described above.
As far as IO/CPU utlilization goes, IO is definitely within safe limits whereas CPU was ~70% on all data nodes during the hadoop job. Even so, it crashes with the error described in my initial post.
It might simply be that our ES cluster is under-sized in relation to the hadoop cluster, at least without any throttling applied, however we did try and reduce the number of tasks in our jobs, but that did not solve the problem, and again, we have seen similar problems (hadoop overwhelming ES) while on ES 1.X, but they were much more seldom.