Hi,
I am running this code with versions:
elasticsearch 1.5.2
spark 1.4
es-hadoop 2.2.0.BUILD-SNAPSHOT
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
public class SimpleApp1 {
public static void main(String[] args) {
String jars[] = { "/opt/apps/spark/lib/elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar",
"/opt/apps/spark/lib/elasticsearch-spark_2.10-2.2.0.BUILD-20150707.024158-13.jar" };
SparkConf conf = new SparkConf().setAppName("es").setMaster("spark://ip-172-31-34-142:7077").setJars(jars);
conf.set("es.nodes", "172.31.34.142:9200,172.31.34.143:9200,172.31.34.144:9200,172.31.34.145:9200");
conf.set("es.nodes.discovery", "false");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sql = new SQLContext(jsc);
DataFrame test = JavaEsSparkSQL.esDF(sql, "perf_static_index/browserrecord");
test.registerTempTable("br");
test.printSchema();
DataFrame df = sql.sql("select * from br");
long resultCount = df.count();
System.out.println("***************************************************");
System.out.println(resultCount);
System.out.println(df.collectAsList());
System.out.println("***************************************************");
}}
It works absolutely fine when I have a single node ES setup. It fails with below exception when I point it to an ElasticSearch cluster.
4699 [dag-scheduler-event-loop] WARN org.elasticsearch.hadoop.rest.RestRepository - Cannot find node with id [5LgCSo6jTIegkXzuJbxhnQ] (is HTTP enabled?) from shard [Shard[state=STARTED, primary=true, node=5LgCSo6jTIegkXzuJbxhnQ, name=2, index=perf_static_index]] in
nodes [{nM81nZk4SB6WOPHeLZo9mA=Node[id=nM81nZk4SB6WOPHeLZo9mA, name=Randall Shire, ipAddress=172.31.34.142, httpPort=9200], e4TfnxiRSx6s16Byq6LN7Q=Node[id=e4TfnxiRSx6s16Byq6LN7Q, name=Wong, ipAddress=172.31.34.144, httpPort=9200], j8YVfi5pTaey-QuuyYTDkg=Node[id=j8YVf
i5pTaey-QuuyYTDkg, name=Shriek, ipAddress=172.31.34.143, httpPort=9200]}]; layout [[[{state=STARTED, primary=true, node=nM81nZk4SB6WOPHeLZo9mA, relocating_node=null, shard=0, index=perf_static_index}, {state=STARTED, primary=false, node=e4TfnxiRSx6s16Byq6LN7Q, reloca
ting_node=null, shard=0, index=perf_static_index}], [{state=STARTED, primary=true, node=e4TfnxiRSx6s16Byq6LN7Q, relocating_node=null, shard=1, index=perf_static_index}, {state=STARTED, primary=false, node=5LgCSo6jTIegkXzuJbxhnQ, relocating_node=null, shard=1, index=p
erf_static_index}], [{state=STARTED, primary=true, node=5LgCSo6jTIegkXzuJbxhnQ, relocating_node=null, shard=2, index=perf_static_index}, {state=STARTED, primary=false, node=j8YVfi5pTaey-QuuyYTDkg, relocating_node=null, shard=2, index=perf_static_index}], [{state=STAR
TED, primary=false, node=nM81nZk4SB6WOPHeLZo9mA, relocating_node=null, shard=3, index=perf_static_index}, {state=STARTED, primary=true, node=j8YVfi5pTaey-QuuyYTDkg, relocating_node=null, shard=3, index=perf_static_index}], [{state=STARTED, primary=true, node=nM81nZk4
SB6WOPHeLZo9mA, relocating_node=null, shard=4, index=perf_static_index}, {state=STARTED, primary=false, node=e4TfnxiRSx6s16Byq6LN7Q, relocating_node=null, shard=4, index=perf_static_index}]]]
4709 [dag-scheduler-event-loop] WARN org.elasticsearch.hadoop.rest.RestRepository - Cannot find node with id [5LgCSo6jTIegkXzuJbxhnQ] (is HTTP enabled?) from shard [Shard[state=STARTED, primary=false, node=5LgCSo6jTIegkXzuJbxhnQ, name=1, index=perf_static_index]] in
nodes [{nM81nZk4SB6WOPHeLZo9mA=Node[id=nM81nZk4SB6WOPHeLZo9mA, name=Randall Shire, ipAddress=172.31.34.142, httpPort=9200], e4TfnxiRSx6s16Byq6LN7Q=Node[id=e4TfnxiRSx6s16Byq6LN7Q, name=Wong, ipAddress=172.31.34.144, httpPort=9200], j8YVfi5pTaey-QuuyYTDkg=Node[id=j8YV
fi5pTaey-QuuyYTDkg, name=Shriek, ipAddress=172.31.34.143, httpPort=9200]}]; layout [[[{state=STARTED, primary=false, node=e4TfnxiRSx6s16Byq6LN7Q, relocating_node=null, shard=0, index=perf_static_index}, {state=STARTED, primary=true, node=nM81nZk4SB6WOPHeLZo9mA, reloc
ating_node=null, shard=0, index=perf_static_index}], [{state=STARTED, primary=false, node=5LgCSo6jTIegkXzuJbxhnQ, relocating_node=null, shard=1, index=perf_static_index}, {state=STARTED, primary=true, node=e4TfnxiRSx6s16Byq6LN7Q, relocating_node=null, shard=1, index=
perf_static_index}], [{state=STARTED, primary=false, node=j8YVfi5pTaey-QuuyYTDkg, relocating_node=null, shard=2, index=perf_static_index}, {state=STARTED, primary=true, node=5LgCSo6jTIegkXzuJbxhnQ, relocating_node=null, shard=2, index=perf_static_index}], [{state=STA
RTED, primary=true, node=j8YVfi5pTaey-QuuyYTDkg, relocating_node=null, shard=3, index=perf_static_index}, {state=STARTED, primary=false, node=nM81nZk4SB6WOPHeLZo9mA, relocating_node=null, shard=3, index=perf_static_index}], [{state=STARTED, primary=false, node=e4Tfnx
iRSx6s16Byq6LN7Q, relocating_node=null, shard=4, index=perf_static_index}, {state=STARTED, primary=true, node=nM81nZk4SB6WOPHeLZo9mA, relocating_node=null, shard=4, index=perf_static_index}]]]
4714 [dag-scheduler-event-loop] WARN org.apache.spark.scheduler.DAGScheduler - Creating new stage failed due to exception - job: 0
org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable
at org.elasticsearch.hadoop.rest.RestRepository.getReadTargetShards(RestRepository.java:281) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:248) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:51) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:50) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:26) ~[elasticsearch-hadoop-2.2.0.BUILD-SNAPSHOT.jar:2.2.0.BUILD-SNAPSHOT]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:78) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:206) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:204) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:204) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:333) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:234) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:270) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:768) ~[spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419) [spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) [spark-core_2.10-1.4.0.jar:1.4.0]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-core_2.10-1.4.0.jar:1.4.0]
4716 [main] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 failed: count at SimpleApp1.java:25, took 0.077652 s
Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable
at org.elasticsearch.hadoop.rest.RestRepository.getReadTargetShards(RestRepository.java:281)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:248)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:51)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:50)
at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:26)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
Here is the cluster details:
each node is http-enabled data node.
host ip heap.percent ram.percent load node.role master name
ip-172-31-34-144.us-west-2.compute.internal 172.31.34.144 2 d * Wong
ip-172-31-34-142.us-west-2.compute.internal 172.31.34.142 0 d m Randall Shire
ip-172-31-34-143.us-west-2.compute.internal 172.31.34.143 3 d m Shriek
ip-172-31-34-145.us-west-2.compute.internal 172.31.34.145 2 d - Captain Zero
I did run it with es.nodes.client.only = true which led to this error
4939 [main] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 failed: count at SimpleApp1.java:26, took 0.024947 s
Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Client-only routing specified but no client nodes with HTTP-enabled were found in the cluster...
at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonClientNodesIfNeeded(InitializationUtils.java:82)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:228)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:51)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:50)
at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:26)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:78)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:204)
at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321)
at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:333)
at org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:234)
at org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:270)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:768)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
4945 [Thread-0] INFO org.apache.spark.SparkContext - Invoking stop() from shutdown hook