Write ES error with Spark 2.0 release

Hi,

I met a problem when I write rdd to ES with the Spark 2.0.0 release. But it works well in the Spark 1.6.1

Here is my testing code.

import org.elasticsearch.spark._

val conf = new org.apache.spark.SparkConf(true)
conf.set("pushdown", "true")
conf.set("es.node", "localhost")
conf.set("es.port", "9200")
conf.set("es.http.timeout", "5m")

val sc = new org.apache.spark.SparkContext("local", "shell", conf)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val ani = Seq(("panada", 1), ("elephant", 2))
val ardd = sc.parallelize(ani)
val esnf = ardd.map(x => Map("Name" -> x._1, "num" -> x._2))
esnf.saveToEs("spark11/docs")

The sbt build dependencies settings are

libraryDependencies ++= Seq(
("org.apache.spark" % "spark-core_2.11" % "2.0.0"),
("org.apache.spark" % "spark-sql_2.11" % "2.0.0"),
("org.elasticsearch" % "elasticsearch-spark_2.11" % "2.3.3")
)

and the reported error are

16/08/02 14:49:17 INFO Version: Elasticsearch Hadoop v2.3.3 [d284980369]
16/08/02 14:49:18 INFO EsRDDWriter: Writing to [spark11/docs]
16/08/02 14:49:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: org.apache.spark.TaskContext.addOnCompleteCallback(Lscala/Function0;)V
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:42)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/02 14:49:18 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,run-main-group-0]
java.lang.NoSuchMethodError: org.apache.spark.TaskContext.addOnCompleteCallback(Lscala/Function0;)V
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:42)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/02 14:49:18 ERROR ContextCleaner: Error in cleaning thread
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:175)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1229)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:172)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:67)
16/08/02 14:49:18 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodError: org.apache.spark.TaskContext.addOnCompleteCallback(Lscala/Function0;)V
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:42)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Have some body met the same problem in the Spark 2.0 release?

Thanks.

I am facing the same issue .. Let me know if any workaround exists

It seems that you are using version 2.3.3, which only supports up to spark 1.6.1. If you would like to use the Spark 2.0 compatibility, you will need to switch to using the 5.0.0-alpha5 version. Please note that this is an alpha release version and should be used for testing purposes only (i.e. not to be used in production)!!!

There is no built yet for alpha5 for spark_2.11, built is there for elasticsearch-hadoop, But that is built with scala 2.10

I am also facing same problem. Even i have updated version with "org.elasticsearch" % "elasticsearch-spark_2.11" % "5.0.0-alpha4".

@james.baiera 5.0.0-alpha5 version is not available in maven

With the incompatibility of Spark 2.0 with Spark 1.3-1.6, we split the one spark artifact into two separate ones. The Spark 2.0/Scala 2.11 artifact can be found here : https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20_2.11

For the old 1.3-1.6 support level for spark, it can be found here :
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-13_2.11

The Scala 2.10 versions of standalone alpha 5 spark support can be found here :
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20_2.10
and here:
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-13_2.10

1 Like

@james.baiera Thanks

@james.baiera

I am stuck while writing JavaRDD to Elastic search
using library from https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20_2.11

JavaEsSpark.saveToEs(myrdd, “temp_index”); throws error saying that not able to connect to cluster

However, when I create map for es.nodes and es.port it doesnt throw above error
final HashMap<String, String> ESmap=new HashMap<>();
ESmap.put("es.nodes”,"ip");
ESmap.put("es.port", "9200");

JavaEsSpark.saveToEs(final_stats, “temp_index", ESmap);

but it never writes and the task is stuck at that stage

There is no exception thrown as well

Spark Version is 2.11, Java 8

Sorry,

I am facing the same problem but I don't find the method above the issue.

Is there any method to solve it with the spark 2.xx and es 2.xx ?