Throttling indexing to Elasticsearch in Spark

Is there a way to throttle the number of tasks to index Elasticsearch but not throttle the number of tasks
for the compute (map, flatmap) tasks in spark?

We're using the native ES-Hadoop plugin.

1 Like

@shakdoesspark The number of task to index the ES is based on the shards in ES.

So I have 5 shards on my index, 0 replicas, and I'm using the RDD.saveToEsWithMeta method, but I'm seeing 256 tasks being created. I have 8 nodes in my spark cluster, and I've set the --executor-cores to 4.

I'm seeing that it's running 32 Tasks( 8 nodes * 4 cores per Executor ).

Not true: The number of tasks used to READ from ES is based on the number of shards it is reading from. You can write to ES using any number of tasks, there's no way for the library to control this as it is a user setting in both Hadoop and Spark. We just suggest using a number of partitions equal to the shards being written to as a starting point and for tuning to go from there.

@shakdoesspark I would advise using the RDD.repartition(x: Int) method to shrink the number of splits or to modify the original number of splits on the RDD to be a lower number.

From my experience the answer is no. I have been saving my processed data into HDFS or S3 then I have a separate job read and push to ES.

I've also been using Zeppelin notebooks so setting --num-executors and cores is a little more difficult. So I've been setting them directly on the config before calling saveToEs

spark.conf.set("spark.dynamicAllocation.enabled","false")
spark.conf.set("spark.executor.instances", "8") // --num-executors
spark.conf.set("spark.executor.cores", "4");
df.saveToEs(esConfig)

However when visiting the Executors tab in the SparkHistory UI the summary never matches up to what I set.

I find it difficult to get visibility into the task that are actually running. I have enabled the extra logging but those logs are created on each task node and I have not tried to use them to gage task usage yet.

RDD.repartition works great!
@jspooner, you should give that a try!

You'd better use df.coalesce (...) than df.repartition (...) for spark effeciency.

After coalesce, why would you need to repartition ?

no, I mean
use coalesce instead of repartition.

ah ok thanks for the clarification