Throttle the ES-Hadoop write speed

1. problem background

I am using an elasticsearch cluster to save the result from spark2.3.
Also this cluster offer an online query .
My spark task is a daily work that will write 6 million records to ES cluster every day .
now the process of writing to es will use 10min every day , indexing speed is about 10k per second, but during this 10min, there will be some queries spend more the 1 second , but if i test this query in the other time of the day (not this 10min), the query is very fast (about 10 milliseconds).

So i think that maybe the es cluster is over take , and i want to lower the write speed (i can accept longer time for the writing process) so that the online query will be faster.

the ES-Hadoop maven

        <dependency> <!-- Spark dependency -->


2. The work i have tried

  1. use less executors of spark
--executor-cores 1   --num-executors 1
  1. reduce the write batch size and close the refresh
 SparkConf sparkConf = new SparkConf()
                .set(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "50")
                .set(ConfigurationOptions.ES_BATCH_WRITE_REFRESH, "false")

but the speed is still too high for me
i want to know is there any other way to throttle the write speed for ES-Hadoop

sorry for bother you

any body can help me ?
if no way , i have to dispose the ES-Hadoop and write to elasticsearch manually by the elasticsearch java client :joy:

i rewrite the org.elasticsearch.spark.rdd.EsRDDWriter.write()

just like this

  * Created by chencc on 2020/8/31.
class MyEsDataFrameWriter (schema: StructType, override val serializedSettings: String)
  extends EsRDDWriter[Row](serializedSettings:String) {

  override protected def valueWriter: Class[_ <: ValueWriter[_]] = classOf[DataFrameValueWriter]
  override protected def bytesConverter: Class[_ <: BytesConverter] = classOf[JdkBytesConverter]
  override protected def fieldExtractor: Class[_ <: FieldExtractor] = classOf[DataFrameFieldExtractor]

  override protected def processData(data: Iterator[Row]): Any = { (, schema) }

  override def write(taskContext: TaskContext, data: Iterator[Row]): Unit = {
    val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)

    taskContext.addTaskCompletionListener((TaskContext) => writer.close())

    if (runtimeMetadata) {

    val counter= new AtomicInteger(0);
    while (data.hasNext) {
        counter.set(0)"batch is 2000 will sleep 50 milliseconds ")

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.