Pushback to hadoop from es on bulk load


When we load data from hadoop into elasticsearch, we keep seeing errors in the tasks like this:
org.elasticsearch.hadoop.EsHadoopException: Could not write all entries [99/347072] (maybe ES was overloaded?).
Bailing out...
Since our hadoop cluster can load/read data at an enormous rate i am not surprised our (much smaller) elasticsearch cluster can not keep up. Fair enough. So this question is not about optimizing elasticsearch for faster indexing.
My question is: why can elasticsearch not do some kind of pushback to slow down the hadoop job to a speed that is acceptable for elasticsearch? It seems elasticsearch will happily keep on ingesting data at a rate it simply cannot sustain...

(Costin Leau) #2

Actually Elasticsearch does push back - when it's overloaded, some entries will be processed, some rejected. the connector retries the entries a number of times (which is configurable). If after X retries there are still rejections, the job bails out.

This is on purpose since the connector holds no state - there's no buffer to put the in-flight data.
The user however can:

  • increase the number of retries
  • reduce the number of hadoop tasks/jobs
  • reduce the number of docs/size (which are by the way per task - so a job with 100 tasks will result in 100x(docs) per connection).

Potentially, the retry limit can be disabled so the connector can keep on retrying over and over again - in which case one would have to cancel the job if not pleased with performance.

(Costin Leau) #3

By the way, I've just pushed an improvement in master where setting a negative value to es.batch.write.retry.count will mean infinite retries.
Note this available in 2.1 only and you can try it out in the next nightly build.



do i understand it correctly that you say elasticsearch relies on failure of hadoop tasks for pushback?
so the errors we see (EsHadoopException, maybe ES was overloaded?) is basically a form of pushback?

thats tricky to me for a few reasons:

  1. hadoop tasks are not a very granular form of pushback. retrying a task might mean millions of inserts get redone
  2. hadoop task failures are errors from which hadoop tries to recover. when engineers see these errors and the stacktraces they think something is wrong, and they will tune the system until it goes away (looking for pushback mechanisms, like i did).

(Costin Leau) #5

No. The error is not a pushback but rather the connector giving up.

The term pushback doesn't really apply here since there's no bi-directional communication between Hadoop and the connector - the connector cannot say, there's too much data, slow down. The connector is simply told to write data and in case of Elasticsearch load, it keeps retrying, each time with the amount of the data that remains (typically less and less) giving Elasticsearch breathing space and holding the Hadoop job from writing more data.

To reiterate what I was saying in my previous post: it's only Elasticsearch that pushes back. The connector takes that into account through es.batch.write.retry.count and es.batch.write.retry.wait - that is, wait a bit and retry again.

Hadoop doesn't support push back hence the connector cannot push data back to the source but rather stall the pipeline which is not ideal but somewhat effective (depending on how many tasks one has).

Throttling write speed in ES-Hadoop Connector

i see okay.

we will try changing es.batch.write.retry.count and es.batch.write.retry.wait

(Anastasios Zouzias) #7

Costin, thank you for the detailed answer.

I had a similar issue with Spark and it is now fixed by reducing the number of executors.

(Guillermo Ortiz) #8

How many executors, ES nodes and documents indexed per second?

(Kamaldeep Singh) #9

@costin are you guys looking into improving the connector and maybe slow down the write speed automatically ?

Sorry I also started another thread about this.
Throttling write speed in ES-Hadoop Connector

(system) #10