org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried

Hi ,
i have started using hadoop-elasticsearch spark module, and was successfully able to pull data from my dev box(running from Intellij) , from my production AWS elasticsearch service.

But when i create a jar including all dependencies and try to run from my spark cluster, then i face the below error,
16/07/22 21:26:16 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
16/07/22 21:26:16 INFO HttpMethodDirector: Retrying request
16/07/22 21:26:16 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
16/07/22 21:26:16 INFO HttpMethodDirector: Retrying request
and after a while i get below error,
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed;[nodes with ip and port]

at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:143)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:447)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:439)
at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:454)
at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92)
at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:262)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Any help is much appreciated.

HI Satyajit,

can you confirm that (all) your Spark nodes can connect to your Elasticsearch nodes ?
Is the list of nodes shown in the error message correct? I had a similar issue some time ago where I had multiple configurations for Elasticsearch and there was some mix-up in the Spark context config.
It is also a good idea also to include the version of elasticsearch-hadoop you are running.

Hi Larghir,

Thank you for responding back.
Yeah i was able to access elasticsearch from all nodes and the nodes with which connection failed are right elasticsearch ip's.

i have pull the master code from github for elasticsearch-hadoop.
And could you please explain more about, multiple elasticsearch configurations and mix up spark context config.

Hi,

PFB error logs,

16/07/26 13:16:42 DEBUG ScalaEsRDD$: Partition reader instance [EsPartition [node=[76H3V5jPR3CbVsjvODVGPw/es05.v2|10...:9200],shard=0]] assigned to [76H3V5jPR3CbVsjvODVGPw]:[9200]
16/07/26 13:16:42 DEBUG NetworkClient: Opening (pinned) network client to 10.
..:9200
16/07/26 13:18:50 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
16/07/26 13:18:50 INFO HttpMethodDirector: Retrying request

the same is happening when es.nodes.wan.only is also set to true.

object ListinglifecyclePull {

def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseFile(new File(configPath))
val proxyHost = config.getString("proxyHost")
val proxyPort = config.getString("proxyPort")
val nodes = config.getString("nodes")
val port = config.getString("port")
val useSystemProxies = config.getString("useSystemProxies")
val allowMultipleContexts = config.getString("allowMultipleContexts")
val nodesdiscovery = config.getString("nodesdiscovery")
val blockSize = config.getString("blockSize")
val wan = config.getString("wan")
val wanonly = config.getString("wanonly")

      val sparkConf = new SparkConf()
        .setAppName("ListinglifecyclePull")

      System.setProperty("http.proxyHost", proxyHost)
      System.setProperty("http.proxyPort", proxyPort)
      sparkConf.set("es.nodes", nodes)
      sparkConf.set("es.port", port)
      sparkConf.set("java.net.useSystemProxies", useSystemProxies)
      sparkConf.set("spark.driver.allowMultipleContexts", allowMultipleContexts)
      sparkConf.set("es.nodes.discovery", nodesdiscovery)
      sparkConf.set("es.batch.size.entries",blockSize)
      sparkConf.set("es.nodes.wan.only",wan)
      sparkConf.set("wan.only.mode",wanonly)


      val sc = new SparkContext(sparkConf)

      val sqlContext = new SQLContext(sc)


      val listingLifeCycleSchemaString = "@timestamp,message,@version,Type,host,path,timestamp,log_level,city,close_price,errors,image_count,list_price,listing_source,listing_url,mls_id,mls_listing_date,mls_number,mls_property_type,mls_sysid,mls_updated_date,process_stage,process_status,property_id,property_type_code,stage_created_date,stage_listing_status,state,visibility,zipcode"

                                                                                                                                                                                                                                                                   
      val run_date = date.replaceAll("-", ".")
      import org.elasticsearch.spark._
      val esquery = "{\"query\":{ \"match_all\":{} }}"
      val data = sc.esRDD("logstash-listing_lifecycle_monitor-DATE/listing_lifecycle_monitor".replaceAll("DATE", run_date), esquery)

      val esdata = data.map({
        case (k, v) =>
          Row(cleanData(convertTimestamp(v("@timestamp"))), cleanData(v("message")), cleanData(v("@version")), cleanData(v("type")), cleanData(v("host")), cleanData(v("path")), cleanData(v("timestamp")), cleanData(v("log_level")), cleanData(v("city")), cleanData(v("close_price")), cleanData(v("errors")), cleanData(v("image_count")), cleanData(v("list_price")), cleanData(v("listing_source")), cleanData(v("listing_url")), cleanData(v("mls_id")), cleanData(v("mls_listing_date")), cleanData(v("mls_number")), cleanData(v("mls_property_type")), cleanData(v("mls_sysid")), if (cleanData((v("mls_updated_date"))) == None) "None" else cleanData((v("mls_updated_date"))), cleanData(v("process_stage")), cleanData(v("process_status")), cleanData(v("property_id")), cleanData(v("property_type_code")), cleanData(createTimestamp(v("stage_created_date").toString)), cleanData(v("stage_listing_status")), cleanData(v("state")), cleanData(v("visibility")), cleanData(v("zipcode")))
      })
      val df = sqlContext.createDataFrame(esdata, schema)
      df.write.parquet(destinationPath+date)

}
}
Surprisingly the same code works , when i try to run it from intellij. But when i try spark-submit by creating a jar it gives the above error.

Elasticsearch version 1.7.5 and elasticsearch-hadoop v5.0.0.BUILD-SNAPSHOT.

Regards,
Satyajit.