Overloading during bulk write from Spark

I create a completely new index and write it to ES using the Spark/Hadoop library from Elastic.

I need a pretty big cluster to accomplish the calculations that create the index and the last step it to write it to Elasticsearch. The problem is that the cluster seems to overload the ES cluster.

  1. How should I control this situation, large compute cluster writing to smaller ES cluster?
  2. can I throttle writing in some way using the ES Hadoop/Spark lib?
  3. I was told elsewhere that I could change how many documents I write in one post and that would make the write go faster, is this true and what parameter controls this?
  4. is there any other way to make this more efficient like shutting down indexing until the write is done? Since I write then swap an alias I can refresh/reindex before the swap is done.

Here is an example of the errors I get. Usually the 3-times retry causes the job to succeed eventually but so many tasks fail that it is wasting a lot of time with retries.

2017-03-20 10:48:27,745 WARN org.apache.spark.scheduler.TaskSetManager [task-result-getter-2] - Lost task 568.1 in stage 81.0 (TID 18982, ip-172-16-2-76.ec2.internal): org.apache.spark.util.TaskCompletionListenerException: Could not write all entries [41/87360] (maybe ES was overloaded?). Bailing out...
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:112)
at org.apache.spark.scheduler.Task.run(Task.scala:102)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Which versions of the technologies are you using? In newer versions of ES-Hadoop we write out a sample of the errors encountered during the write. That will confirm if you are being plagued by request rejections. You can also check the log output in Elasticsearch for rejections.

In any case, we often find that repartitioning your data in Spark using either repartition(), or more preferably coalesce() if you are lowering number of partitions, is a simple way to throttle the rate of indexing to ES.

using ES 1.7.6 and elasticsearch-spark 2.1.2, which I assume is too old to be doing the extra logging.

The only errors/warnings are shown above. The Spark GUI looks like a bunch of waiting tasks with no progress so I jumped to the conclusion that the cluster couldn't handle 1200 simultaneous connections, one per partition writer.

Another responder on SO said that these would "go away" if I increased the size of the chunk of data allowed in a single POST body but I see no way to control this. From other reading this is using the bulk write APIs so he must be talking about using the bulk API POST.

As to coalesce; the Spark cluster has data partitioned to about 1200, or 4 x total # of cores. I assume the coalesce should target the capacity of the ES cluster, not Spark and I have 3 c4.large, which have plenty of memory for the rather small index, but only 6 cores. In ES 5 the thread pool for bulk writes is 1 x # of cores so it sounds like I should coalesce to 6 partitions? But it also sounds like increasing the bulk POST body size will help, maybe a good deal.

Does anyone know how to increase the bulk API POST body size? Should that help above?

Don't forget that the each node maintains a request queue for bulk requests, which sits at fifty (last time I checked, but that was also on 5.x, may be very different for 1.7). Any more than that and the cluster will reject the requests. The spark connector tries to write to nodes that host primary shards only so that a portion of the results that land on a node are already at home with their primary. I would say that coalescing down to only 6 partitions is a bit over kill. Mind you, the spark connector will block during each bulk operation, so as long as your cluster can accept the full set of requests at once (not necessarily process all at once) then you shouldn't see rejections any more. I would hazard a rough calculation of: (numberOfNodesHostingPrimaries * bulkRequestQueueLength) * 0.75 is probably an ok starting point, then you can adjust upward or downward as needed.

Thanks, the queue makes a big difference and since we don't have a job failure, we can't be that far off with write tasks. 1200 down to ~112 is where we'll start.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.