Write ES error with Spark 2.0 release


#1

Hi,

I met a problem when I write rdd to ES with the Spark 2.0.0 release. But it works well in the Spark 1.6.1

Here is my testing code.

import org.elasticsearch.spark._

val conf = new org.apache.spark.SparkConf(true)
conf.set("pushdown", "true")
conf.set("es.node", "localhost")
conf.set("es.port", "9200")
conf.set("es.http.timeout", "5m")

val sc = new org.apache.spark.SparkContext("local", "shell", conf)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val ani = Seq(("panada", 1), ("elephant", 2))
val ardd = sc.parallelize(ani)
val esnf = ardd.map(x => Map("Name" -> x._1, "num" -> x._2))
esnf.saveToEs("spark11/docs")

The sbt build dependencies settings are

libraryDependencies ++= Seq(
("org.apache.spark" % "spark-core_2.11" % "2.0.0"),
("org.apache.spark" % "spark-sql_2.11" % "2.0.0"),
("org.elasticsearch" % "elasticsearch-spark_2.11" % "2.3.3")
)

and the reported error are

16/08/02 14:49:17 INFO Version: Elasticsearch Hadoop v2.3.3 [d284980369]
16/08/02 14:49:18 INFO EsRDDWriter: Writing to [spark11/docs]
16/08/02 14:49:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: org.apache.spark.TaskContext.addOnCompleteCallback(Lscala/Function0;)V
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:42)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/02 14:49:18 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,run-main-group-0]
java.lang.NoSuchMethodError: org.apache.spark.TaskContext.addOnCompleteCallback(Lscala/Function0;)V
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:42)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/02 14:49:18 ERROR ContextCleaner: Error in cleaning thread
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:175)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1229)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:172)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:67)
16/08/02 14:49:18 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodError: org.apache.spark.TaskContext.addOnCompleteCallback(Lscala/Function0;)V
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:42)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Have some body met the same problem in the Spark 2.0 release?

Thanks.


(Saurabh Malviya) #2

I am facing the same issue .. Let me know if any workaround exists


(James Baiera) #3

It seems that you are using version 2.3.3, which only supports up to spark 1.6.1. If you would like to use the Spark 2.0 compatibility, you will need to switch to using the 5.0.0-alpha5 version. Please note that this is an alpha release version and should be used for testing purposes only (i.e. not to be used in production)!!!


(Saurabh Malviya) #4

There is no built yet for alpha5 for spark_2.11, built is there for elasticsearch-hadoop, But that is built with scala 2.10


(Satendra Kumar ) #5

I am also facing same problem. Even i have updated version with "org.elasticsearch" % "elasticsearch-spark_2.11" % "5.0.0-alpha4".

@james.baiera 5.0.0-alpha5 version is not available in maven


(James Baiera) #6

With the incompatibility of Spark 2.0 with Spark 1.3-1.6, we split the one spark artifact into two separate ones. The Spark 2.0/Scala 2.11 artifact can be found here : https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20_2.11

For the old 1.3-1.6 support level for spark, it can be found here :
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-13_2.11

The Scala 2.10 versions of standalone alpha 5 spark support can be found here :
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20_2.10
and here:
https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-13_2.10


(Satendra Kumar ) #7

@james.baiera Thanks


(Ankur Gupta) #8

@james.baiera

I am stuck while writing JavaRDD to Elastic search
using library from https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20_2.11

JavaEsSpark.saveToEs(myrdd, “temp_index”); throws error saying that not able to connect to cluster

However, when I create map for es.nodes and es.port it doesnt throw above error
final HashMap<String, String> ESmap=new HashMap<>();
ESmap.put("es.nodes”,"ip");
ESmap.put("es.port", "9200");

JavaEsSpark.saveToEs(final_stats, “temp_index", ESmap);

but it never writes and the task is stuck at that stage

There is no exception thrown as well

Spark Version is 2.11, Java 8


(new to Es) #9

Sorry,

I am facing the same problem but I don't find the method above the issue.

Is there any method to solve it with the spark 2.xx and es 2.xx ?


(system) #10