Throttle the ES-Hadoop write speed

1. problem background

I am using an elasticsearch cluster to save the result from spark2.3.
Also this cluster offer an online query .
My spark task is a daily work that will write 6 million records to ES cluster every day .
now the process of writing to es will use 10min every day , indexing speed is about 10k per second, but during this 10min, there will be some queries spend more the 1 second , but if i test this query in the other time of the day (not this 10min), the query is very fast (about 10 milliseconds).

So i think that maybe the es cluster is over take , and i want to lower the write speed (i can accept longer time for the writing process) so that the online query will be faster.

the ES-Hadoop maven


        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

       <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>7.8.1</version>
        </dependency>

2. The work i have tried

  1. use less executors of spark
--executor-cores 1   --num-executors 1
  1. reduce the write batch size and close the refresh
 SparkConf sparkConf = new SparkConf()
               ...
                .set(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "50")
                .set(ConfigurationOptions.ES_BATCH_WRITE_REFRESH, "false")

but the speed is still too high for me
i want to know is there any other way to throttle the write speed for ES-Hadoop

sorry for bother you

any body can help me ?
if no way , i have to dispose the ES-Hadoop and write to elasticsearch manually by the elasticsearch java client :joy:

i rewrite the org.elasticsearch.spark.rdd.EsRDDWriter.write()

just like this


/**
  * Created by chencc on 2020/8/31.
  */
@Slf4j
class MyEsDataFrameWriter (schema: StructType, override val serializedSettings: String)
  extends EsRDDWriter[Row](serializedSettings:String) {

  override protected def valueWriter: Class[_ <: ValueWriter[_]] = classOf[DataFrameValueWriter]
  override protected def bytesConverter: Class[_ <: BytesConverter] = classOf[JdkBytesConverter]
  override protected def fieldExtractor: Class[_ <: FieldExtractor] = classOf[DataFrameFieldExtractor]

  override protected def processData(data: Iterator[Row]): Any = { (data.next, schema) }

  override def write(taskContext: TaskContext, data: Iterator[Row]): Unit = {
    val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)

    taskContext.addTaskCompletionListener((TaskContext) => writer.close())

    if (runtimeMetadata) {
      writer.repository.addRuntimeFieldExtractor(metaExtractor)
    }

    val counter= new AtomicInteger(0);
    while (data.hasNext) {
      counter.incrementAndGet();
      writer.repository.writeToIndex(processData(data))
      if(counter.get()>=100){
        Thread.sleep(100);
        counter.set(0)
        log.info("batch is 2000 will sleep 50 milliseconds ")
      }
    }
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.