Facing EsHadoopIllegalStateException when reading from a ES cluster

Hi,

I am running this code with versions:

elasticsearch 1.5.2
spark 1.4
es-hadoop 2.2.0.BUILD-SNAPSHOT

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;

public class SimpleApp1 {

    public static void main(String[] args) {
	String jars[] = { "/opt/apps/spark/lib/elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar",
		"/opt/apps/spark/lib/elasticsearch-spark_2.10-2.2.0.BUILD-20150707.024158-13.jar" };
	SparkConf conf = new SparkConf().setAppName("es").setMaster("spark://ip-172-31-34-142:7077").setJars(jars);
	conf.set("es.nodes", "172.31.34.142:9200,172.31.34.143:9200,172.31.34.144:9200,172.31.34.145:9200");
	conf.set("es.nodes.discovery", "false");
	JavaSparkContext jsc = new JavaSparkContext(conf);
	SQLContext sql = new SQLContext(jsc);

	DataFrame test = JavaEsSparkSQL.esDF(sql, "perf_static_index/browserrecord");
	test.registerTempTable("br");
	test.printSchema();

	DataFrame df = sql.sql("select * from br");
	long resultCount = df.count();
	System.out.println("***************************************************");
	System.out.println(resultCount);
	System.out.println(df.collectAsList());
	System.out.println("***************************************************");
    }}

It works absolutely fine when I have a single node ES setup. It fails with below exception when I point it to an ElasticSearch cluster.

4699 [dag-scheduler-event-loop] WARN  org.elasticsearch.hadoop.rest.RestRepository - Cannot find node with id [5LgCSo6jTIegkXzuJbxhnQ] (is HTTP enabled?) from shard [Shard[state=STARTED, primary=true, node=5LgCSo6jTIegkXzuJbxhnQ, name=2, index=perf_static_index]] in 
nodes [{nM81nZk4SB6WOPHeLZo9mA=Node[id=nM81nZk4SB6WOPHeLZo9mA, name=Randall Shire, ipAddress=172.31.34.142, httpPort=9200], e4TfnxiRSx6s16Byq6LN7Q=Node[id=e4TfnxiRSx6s16Byq6LN7Q, name=Wong, ipAddress=172.31.34.144, httpPort=9200], j8YVfi5pTaey-QuuyYTDkg=Node[id=j8YVf
i5pTaey-QuuyYTDkg, name=Shriek, ipAddress=172.31.34.143, httpPort=9200]}]; layout [[[{state=STARTED, primary=true, node=nM81nZk4SB6WOPHeLZo9mA, relocating_node=null, shard=0, index=perf_static_index}, {state=STARTED, primary=false, node=e4TfnxiRSx6s16Byq6LN7Q, reloca
ting_node=null, shard=0, index=perf_static_index}], [{state=STARTED, primary=true, node=e4TfnxiRSx6s16Byq6LN7Q, relocating_node=null, shard=1, index=perf_static_index}, {state=STARTED, primary=false, node=5LgCSo6jTIegkXzuJbxhnQ, relocating_node=null, shard=1, index=p
erf_static_index}], [{state=STARTED, primary=true, node=5LgCSo6jTIegkXzuJbxhnQ, relocating_node=null, shard=2, index=perf_static_index}, {state=STARTED, primary=false, node=j8YVfi5pTaey-QuuyYTDkg, relocating_node=null, shard=2, index=perf_static_index}], [{state=STAR
TED, primary=false, node=nM81nZk4SB6WOPHeLZo9mA, relocating_node=null, shard=3, index=perf_static_index}, {state=STARTED, primary=true, node=j8YVfi5pTaey-QuuyYTDkg, relocating_node=null, shard=3, index=perf_static_index}], [{state=STARTED, primary=true, node=nM81nZk4
SB6WOPHeLZo9mA, relocating_node=null, shard=4, index=perf_static_index}, {state=STARTED, primary=false, node=e4TfnxiRSx6s16Byq6LN7Q, relocating_node=null, shard=4, index=perf_static_index}]]]
4709 [dag-scheduler-event-loop] WARN  org.elasticsearch.hadoop.rest.RestRepository - Cannot find node with id [5LgCSo6jTIegkXzuJbxhnQ] (is HTTP enabled?) from shard [Shard[state=STARTED, primary=false, node=5LgCSo6jTIegkXzuJbxhnQ, name=1, index=perf_static_index]] in
 nodes [{nM81nZk4SB6WOPHeLZo9mA=Node[id=nM81nZk4SB6WOPHeLZo9mA, name=Randall Shire, ipAddress=172.31.34.142, httpPort=9200], e4TfnxiRSx6s16Byq6LN7Q=Node[id=e4TfnxiRSx6s16Byq6LN7Q, name=Wong, ipAddress=172.31.34.144, httpPort=9200], j8YVfi5pTaey-QuuyYTDkg=Node[id=j8YV
fi5pTaey-QuuyYTDkg, name=Shriek, ipAddress=172.31.34.143, httpPort=9200]}]; layout [[[{state=STARTED, primary=false, node=e4TfnxiRSx6s16Byq6LN7Q, relocating_node=null, shard=0, index=perf_static_index}, {state=STARTED, primary=true, node=nM81nZk4SB6WOPHeLZo9mA, reloc
ating_node=null, shard=0, index=perf_static_index}], [{state=STARTED, primary=false, node=5LgCSo6jTIegkXzuJbxhnQ, relocating_node=null, shard=1, index=perf_static_index}, {state=STARTED, primary=true, node=e4TfnxiRSx6s16Byq6LN7Q, relocating_node=null, shard=1, index=
perf_static_index}], [{state=STARTED, primary=false, node=j8YVfi5pTaey-QuuyYTDkg, relocating_node=null, shard=2, index=perf_static_index}, {state=STARTED, primary=true, node=5LgCSo6jTIegkXzuJbxhnQ, relocating_node=null, shard=2, index=perf_static_index}], [{state=STA
RTED, primary=true, node=j8YVfi5pTaey-QuuyYTDkg, relocating_node=null, shard=3, index=perf_static_index}, {state=STARTED, primary=false, node=nM81nZk4SB6WOPHeLZo9mA, relocating_node=null, shard=3, index=perf_static_index}], [{state=STARTED, primary=false, node=e4Tfnx
iRSx6s16Byq6LN7Q, relocating_node=null, shard=4, index=perf_static_index}, {state=STARTED, primary=true, node=nM81nZk4SB6WOPHeLZo9mA, relocating_node=null, shard=4, index=perf_static_index}]]]
4714 [dag-scheduler-event-loop] WARN  org.apache.spark.scheduler.DAGScheduler - Creating new stage failed due to exception - job: 0
org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable
        at org.elasticsearch.hadoop.rest.RestRepository.getReadTargetShards(RestRepository.java:281) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
        at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:248) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
        at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:51) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
        at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:50) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
        at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:26) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:78) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:206) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:204) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
        at org.apache.spark.rdd.RDD.dependencies(RDD.scala:204) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:333) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:234) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:270) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:768) ~[spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419) [spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) [spark-core_2.10-1.4.0.jar:1.4.0]
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-core_2.10-1.4.0.jar:1.4.0]
4716 [main] INFO  org.apache.spark.scheduler.DAGScheduler - Job 0 failed: count at SimpleApp1.java:25, took 0.077652 s
Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable
        at org.elasticsearch.hadoop.rest.RestRepository.getReadTargetShards(RestRepository.java:281)
        at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:248)
        at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:51)
        at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:50)
        at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:26)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)

Here is the cluster details:
each node is http-enabled data node.

host                                        ip            heap.percent ram.percent load node.role master name          
ip-172-31-34-144.us-west-2.compute.internal 172.31.34.144            2                  d         *      Wong          
ip-172-31-34-142.us-west-2.compute.internal 172.31.34.142            0                  d         m      Randall Shire 
ip-172-31-34-143.us-west-2.compute.internal 172.31.34.143            3                  d         m      Shriek        
ip-172-31-34-145.us-west-2.compute.internal 172.31.34.145            2                  d         -      Captain Zero 

I did run it with es.nodes.client.only = true which led to this error

4939 [main] INFO  org.apache.spark.scheduler.DAGScheduler - Job 0 failed: count at SimpleApp1.java:26, took 0.024947 s
Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Client-only routing specified but no client nodes with HTTP-enabled were found in the cluster...
        at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonClientNodesIfNeeded(InitializationUtils.java:82)
        at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:228)
        at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:51)
        at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:50)
        at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:26)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82)
        at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:78)
        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:206)
        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:204)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.dependencies(RDD.scala:204)
        at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321)
        at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:333)
        at org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:234)
        at org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:270)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:768)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
4945 [Thread-0] INFO  org.apache.spark.SparkContext - Invoking stop() from shutdown hook

First of, use only one jar - either the hadoop one or the spark one; not both.

Second, the issue occurs since you are running Elasticsearch in AWS and, as many cloud providers, there's a difference between the advertised, public IP of the nodes and the actual IPs where Elasticsearch runs.
In other words, the connector hits Elasticsearch on a public IP, asks the location of the shards and retrieves an internal EC2 IP which is not accessible from Spark.

This can be fixed by configuring Elasticsearch to use the proper / public IPs for publishing as explained in the reference documentation
Further more, I assume you are familiar with the cloud-aws plugin, if not please try it out.

thanks @costin. I will try this.