We are testing a indexing pipeline where data would be processed by spark, enriched, and sent to Elasticsearch for indexing. We are having recurrent org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException errors, typically about 10 min. after starting the job.
My understanding from the references I have been able to find is that this is caused by Elasticsearch not having enough capacity to process all the data sent by spark, and if the error is transient, it can be mitigated by having a larger value in the es.batch.write.retry.count setting, or even a negative value (meaning infinite retries).
This doesn't seem to fit our experience:
ES cluster is not at the capacity limit (at least, CPU is below 40% and IO is well bellow capacity according to iostat), and the cluster remains in «green» status for the whole time. We have even eliminated parallelism on spark side by limiting the number of executors and cores to 1.
Setting es.batch.write.retry.count to a negative value (or to a very large value) doesn't make the error go away. Now, I would expect that even if the network connection between spark <-> ES was interrupted, this configuration parameter would prevent the error (the indexing process would just 'stall' on spark side until the connection was restablished)
We are using the elasticsearch-spark.2-10-2.1.0. jar. This is how our spark context is created.
val conf = new SparkConf(); conf.set("es.nodes", config.elasticsearch_nodes); conf.set("es.batch.write.retry.count", "-1"); conf.set("es.batch.write.retry.wait", "100"); val sc = new SparkContext(conf);
It may be useful to know that marvel shows the number of segments oscillating a bit below 100. We have set indices.store.throttle.type to "none".
So there are actually three questions here:
- How can it be that we see EsHadoopNoNodesLeftException with a negative value for retry.count? This is the critical issue: if we can get spark to retry after a while I would expect ES would be able to resume indexing.
- Why would ES fail way before getting close to its CPU & IO limits? Can we expect to get reasonably close to 100% CPU or 100% IO when indexing?
- Is it this just not the way to create massive indexes? Would a "pull" architecture, with spark dumping the data into some store and ES pulling from this store be more reliable? Sadly rivers are deprecated now so I don't how how one would implemente a pull pipeline.