Elasticsearch Spark EsHadoopNoNodesLeftException in cluster Mode

Hi,

I am trying to index data trough apache spark to elasticsearch. Apache
Spark is used for data enrichment and then i am using saveToEs to index
data.

Local mode my code is working fine, but when I run code on cluster mode
with 1 Master and 2 slave it indexes around 1700 data and showing following
error

org.apache.spark.SparkException: Job aborted due to stage failure: Task 9

in stage 1711.0 failed 4 times, most recent failure: Lost task 9.3 in stage
1711.0 (TID 27379, ip-1-169-15-116.ec2.internal):
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection
error (check network and/or proxy settings)- all nodes failed; tried
[[4.212.11.16:9200]]

my configurations are following

sparkConf.set("es.index.auto.create", "true")
sparkConf.set("es.nodes","1.1.1.1:9200") //my elasticserver ip 
sparkConf.set("spark.eventLog.enabled","true")
sparkConf.set("es.nodes.discovery", "false") 
  • finalData.foreach(row=>{*
  •  sc.makeRDD(Seq(row)).saveToEs("spark71/docs")*
    
  • println(row)*
    
  • })*

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/3ec58ae2-8578-4f8d-8122-960812b6811a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Check the node status and see whether it behaves normally or not while
data is being loaded. If the load is too high and the node not
properly configured it could keep on rejecting data or a GC might be
triggered, causing es-hadoop to fail the job.

On 12/23/14, Rahul Kumar rahulkumar.aws@gmail.com wrote:

Hi,

I am trying to index data trough apache spark to elasticsearch. Apache
Spark is used for data enrichment and then i am using saveToEs to index
data.

Local mode my code is working fine, but when I run code on cluster mode
with 1 Master and 2 slave it indexes around 1700 data and showing following

error

org.apache.spark.SparkException: Job aborted due to stage failure: Task 9

in stage 1711.0 failed 4 times, most recent failure: Lost task 9.3 in
stage
1711.0 (TID 27379, ip-1-169-15-116.ec2.internal):
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection
error (check network and/or proxy settings)- all nodes failed; tried
[[4.212.11.16:9200]]

my configurations are following

sparkConf.set("es.index.auto.create", "true")
sparkConf.set("es.nodes","1.1.1.1:9200") //my elasticserver ip
sparkConf.set("spark.eventLog.enabled","true")
sparkConf.set("es.nodes.discovery", "false")
  • finalData.foreach(row=>{*
  •  sc.makeRDD(Seq(row)).saveToEs("spark71/docs")*
    
  • println(row)*
    
  • })*

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/3ec58ae2-8578-4f8d-8122-960812b6811a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAJogdmdyWJykijeEn1%2BfM-TEKLP3u2QKwE6Y4UJRLjej1wWv8w%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Hi,

I got the similar exception when I tried to index the data. I resolved this by configuring "elasticsearch.yml" as

bootstrap.mlockall: true
index.number_of_shards: 40
index.term_index_interval: 1000
index.term_index_divisor: 5
index.translog.flush_threshold_period: 5s
index.refresh_interval: 5s
index.merge.policy.use_compound_file: false
index.compound_format: false
indices.memory.index_buffer_size: 40%

And add this configuration in your code when you initialize sparkcontext object.

sparkConf.set("es.batch.size.bytes", "300000000")
sparkConf.set("es.batch.size.entries", "10000")
sparkConf.set("es.batch.write.refresh", "false")
sparkConf.set("es.batch.write.retry.count", "50")
sparkConf.set("es.batch.write.retry.wait", "500")
sparkConf.set("es.http.timeout", "5m")
sparkConf.set("es.http.retries", "50")
sparkConf.set("es.action.heart.beat.lead", "50")

Apart from these, set ES_HEAP_SIZE also. I think setting these things will resolve your problem

Thanks,
Sushmitha

I would actually recommend against this configuration in particular at least on the es-hadoop side.
When a node keeps on dropping one should try minimizing the load and see whether it has any impact - the configuration
below does the opposite:

  • increases the batch size to 300 million or around 286mb (by the way, use a unit to simplify the configuration and make
    it human readable - 285mb).
  • increases the batch number of entries to 10K

Note the above represent the upper limit for a batch for each Spark task; that is at runtime, one has to multiply the
number of tasks with the batch size to
get the maximum number of batching done when all the tasks at running; in this case it can be around 2850mb or more than
2.5 gb at once against the cluster which is excessive.

Bulk should be significantly smaller to allow ES to quickly ingest it and move to the next. As explained in the docs, a
bulk request should take 1-2s max otherwise
either increase the hardware or minimize the bulk size.

Increasing the wait time/timeout and number of retries is also counter productive since in case of an error, it simply
takes longer for the client to react. These settings
should be used when the network is throttled or to go around potential spikes in the system. However hammering the
system (with bulk size way too big) and working around the rejections
by increasing the wait time and the number of retries is not solving your system performance but rather aggravating and
postponing the issue.

Hope this helps,

On 1/7/15 9:50 AM, sushmitha wrote:

Hi,

I got the similar exception when I tried to index the data. I resolved this
by configuring "elasticsearch.yml" as

bootstrap.mlockall: true
index.number_of_shards: 40
index.term_index_interval: 1000
index.term_index_divisor: 5
index.translog.flush_threshold_period: 5s
index.refresh_interval: 5s
index.merge.policy.use_compound_file: false
index.compound_format: false
indices.memory.index_buffer_size: 40%

And add this configuration in your code when you initialize sparkcontext
object.

 sparkConf.set("es.batch.size.bytes", "300000000")
 sparkConf.set("es.batch.size.entries", "10000")
 sparkConf.set("es.batch.write.refresh", "false")
 sparkConf.set("es.batch.write.retry.count", "50")
 sparkConf.set("es.batch.write.retry.wait", "500")
 sparkConf.set("es.http.timeout", "5m")
 sparkConf.set("es.http.retries", "50")
 sparkConf.set("es.action.heart.beat.lead", "50")

Apart from these, set ES_HEAP_SIZE also. I think setting these things will
resolve your problem

Thanks,
Sushmitha

--
View this message in context: http://elasticsearch-users.115913.n3.nabble.com/Elasticsearch-Spark-EsHadoopNoNodesLeftException-in-cluster-Mode-tp4068066p4068603.html
Sent from the Elasticsearch Users mailing list archive at Nabble.com.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/54AD0763.7070702%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Hi Costin,

 Actually I have to work with more than 100GB of data(approx) i.e., I need to operate on the data which is already indexed and index it again. Could you please tell me ideal configuration for operating on such a huge data? Or if there is any proper link which specifies all the configuration, please provide it.

Thanks,
Sushmitha

es-hadoop doesn't change things from Elastic perspective; it's just another client that interacts with the cluster.
The fact that you are reindexing data against the same cluster means you need to add some buffering since you are
streaming and indexing data against the same hardware.

Whether you have a large or small volume of data, keeping the a bulk size small (as in indexed quickly in 1-2s) is
important.
If the cluster falls behind, it means things are being 'pushed' back, throttled if you'd like. So don't just increase the
timeouts but rather minimize the bulk or/and increase the hardware.
Potentially have another cluster ready to index the data in and then move it into your original cluster...

Cheers,

On 1/12/15 8:51 AM, sushmitha wrote:

Hi Costin,

  Actually I have to work with more than 100GB of data(approx) i.e., I

need to operate on the data which is already indexed and index it again.
Could you please tell me ideal configuration for operating on such a huge
data? Or if there is any proper link which specifies all the configuration,
please provide it.

Thanks,
Sushmitha

--
View this message in context: http://elasticsearch-users.115913.n3.nabble.com/Elasticsearch-Spark-EsHadoopNoNodesLeftException-in-cluster-Mode-tp4068066p4068838.html
Sent from the Elasticsearch Users mailing list archive at Nabble.com.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/54B39A68.8060809%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Hi Costin,

I am using Spark native library to connect to Elastic Search. for non network issues we have the batch retry config, i read in post : [SPARK] es.batch.write.retry.count negative value is ignored

I am deliberately giving an invalid ES IP and the spark errors out with the below trace, since we are running in muti cluster mode, catching the exception is not feasible (have tried) as it goes into its own executor. Is there a config to set network related retry count? FYI i am using 2.3.2 Elastic Seacrh.

Any inputs related to this would be very helpful. :slight_smile:

Exception Stack trace.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1.0 (TID 3, localhost): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:190)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:379)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[127.0.0.11:9200]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:414)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:418)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:122)
at org.elasticsearch.hadoop.rest.RestClient.esVersion(RestClient.java:564)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:178)
... 10 more