[SPARK] es.batch.write.retry.count negative value is ignored

Hi,

We are testing a indexing pipeline where data would be processed by spark, enriched, and sent to Elasticsearch for indexing. We are having recurrent org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException errors, typically about 10 min. after starting the job.

My understanding from the references I have been able to find is that this is caused by Elasticsearch not having enough capacity to process all the data sent by spark, and if the error is transient, it can be mitigated by having a larger value in the es.batch.write.retry.count setting, or even a negative value (meaning infinite retries).

This doesn't seem to fit our experience:

  • ES cluster is not at the capacity limit (at least, CPU is below 40% and IO is well bellow capacity according to iostat), and the cluster remains in «green» status for the whole time. We have even eliminated parallelism on spark side by limiting the number of executors and cores to 1.

  • Setting es.batch.write.retry.count to a negative value (or to a very large value) doesn't make the error go away. Now, I would expect that even if the network connection between spark <-> ES was interrupted, this configuration parameter would prevent the error (the indexing process would just 'stall' on spark side until the connection was restablished)

We are using the elasticsearch-spark.2-10-2.1.0. jar. This is how our spark context is created.

val conf = new SparkConf();
conf.set("es.nodes", config.elasticsearch_nodes);
conf.set("es.batch.write.retry.count", "-1");
conf.set("es.batch.write.retry.wait", "100");
val sc = new SparkContext(conf);

It may be useful to know that marvel shows the number of segments oscillating a bit below 100. We have set indices.store.throttle.type to "none".

So there are actually three questions here:

  1. How can it be that we see EsHadoopNoNodesLeftException with a negative value for retry.count? This is the critical issue: if we can get spark to retry after a while I would expect ES would be able to resume indexing.
  2. Why would ES fail way before getting close to its CPU & IO limits? Can we expect to get reasonably close to 100% CPU or 100% IO when indexing?
  3. Is it this just not the way to create massive indexes? Would a "pull" architecture, with spark dumping the data into some store and ES pulling from this store be more reliable? Sadly rivers are deprecated now so I don't how how one would implemente a pull pipeline.

First off, thanks for the well organized and formatted post - made reading it a breeze.

Your understanding is correct; when too many tasks from Hadoop/Spark are hitting Elasticsearch, the latter starts rejecting documents which the connector retries based on the given configuration.

  1. Note that this is per index not per cluster. That is, through the connector, one writes to one index (or multiple) but not the entire cluster. It might be that the index is spread across all Elasticsearch nodes but more often than not, that is not the case.
    As such, you won't see the load on all nodes. With Marvel you should be able to pinpoint what nodes are used. If you reduced your executor to 1, then really you have one Spark task hitting one node in Elastic which takes all the load.
    However it should be able to cope with it without any issues.

  2. es.batch.write.retry.count should work. Note that the connector has two types of retries:

  • one for network hiccups; in which case things are being retried after a timeout on a different node (the connection is unreliable with the initial node)
  • one for documents rejected. The connection works however the ES node keeps rejecting documents and as such, the connector retries them. This bit is configured through es.batch - it's a logical retry that relies on waiting and number of retries.
    A negative value should indeed retry over and over again, after waiting for 100ms. It's best to specify the time unit (100s vs 10s) since the default might not be always obvious.

More about them here.

  1. Indeed it should keep on retrying. It's not clear what causes the timeout after 10 min. Either way, this is not expected behavior and I would consider it a bug (even if the retries apply but something else leads to the disconnect, this should be properly documented).

  2. see above. Make sure you look at the node that is being used vs the whole cluster. Even then, ES is designed to cope with load as in, rather being completely overloaded and not be able to take any type of requests, it does provide itself with enough breathing space to take other requests and thus limit heavy indexing/reading.

  3. I would say it is. The pull architecture running in Elastic is not suitable since it cannot run in Elastic (as it affects stability) and it's the reason why rivers where deprecated. This pull architecture acts as a liaison between the two parties and it's pushback (retry, stop reading) should be good enough going forward.
    And if that's not the case, it should be addressed.

In other words, I think your post, understanding and expectations are valid; I'm not sure what is causing the current behavior but I'd consider it a bug and that it should be fixed.

The 10m timeout indicates somewhere there's a timeout/deadline that is being hit. I'm wondering if that's something controlled through Spark - namely a task has not completed in 10 min, we don't have any report of it being alive, it should be killed.

Can you please turn on logging on org.elasticsearch.hadoop.rest package all the way to TRACE and post the logs as a gist somewhere. Potentially turning on logging or keeping an eye on Spark logs might provide some useful info.

P.S. I'm currently on leave so I might not reply as fast as I'd like however I'll be checking this thread until my return.

Hi Costin,

Thank-you for looking into the issue, it's greatly appreciated.

Just a quick update on this: after some debugging, we have been able to pinpoint the issue. The problem was that another step of the task was opening a new HTTP connection for every document; after 10 min the spark node would exhaust the number of available TCP ports and this caused a network connection error to elasticsearch. As you described, SimpleHttpRetryPolicy does not retry the same node if it sees a network connection error (only when ES returns a 503 Service Unavailable error), so the list of nodes would be quickly exhausted and es.batch.write.retry.count had indeed no effect.

I would suggest clarifying in the documentation that es.batch.write.retry.count will not retry network errors, and —maybe— adding a setting to enable retrying network errors to the same node (although in our case that would have been problematic, since it would have masked the issue)

Thanks again for your help!!

Hi Angel,

Glad to hear you identified the problem. Agreed that the docs could use some improvements on this front.
As for the RetryPolicy the problem is understanding what the underlying error is - everything is munged into the generic IOException which makes it tricky to understand whether the network is down or whether the host itself has an issue.
By any chance, can you "catch" the stacktrace of the exception raised when the TCP ports ran out?

Cheers,

Hi

This is the stracktrace for hadoop.rest exception:

15/07/14 12:48:33 task-result-getter-1 WARN TaskSetManager: Lost task 227.1 in stage 1.0 (TID 238, ip-172-30-10-86.ec2.internal): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[52.2.225.22:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:135)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:317)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:301)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:305)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:119)
at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:101)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:58)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:372)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Just in case it's useful, this how other steps that attempt to create HTTP connections with java.net. HttpURLConnection manifest the problem:

Caused by: java.net.NoRouteToHostException: Cannot assign requested address
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)

As expected, the underlying Java exception is fairly cryptic - NoRouteToHost.
I've raised an issue to provide better logging on why a node is dropped and to look at individual exceptions and bubble them up in case of failure rather then retrying the connection (as that hides the underlying cause).

As a quick update, this has been addressed in master and will be part of the next 2.1.1 release.
In particular exceptions NoRouteToHost, Bind and UnknownIP are not just logged but rather considered fatal since they indicate a rather serious problem with the setup which cannot be simply addressed with a separate node.

Cheers,