Spark to ES NoSuchMethodError - DataFrameFieldExtractor.extractField

(Richard Morwood) #1

Hello! I am trying to figure out how to upsert rows to ElasticSearch from Spark.
I am using elasticsearch-spark-20_2.10-5.5.1.jar, on a Spark cluster running Spark2.1 with Scala 2.1

My DataFrame printSchema() looks like this;

 |-- datasetid: integer (nullable = false)
 |-- name: string (nullable = false)

The code I am using to write the DataFrame looks like this;

var ElasticSearchHost = ""
var esConfig:Map[String,String] = Map("es.nodes" -> ElasticSearchHost,
                                      "es.port" -> "443",
                                      "es.nodes.wan.only" -> "true",
                                      "" -> "true",
                                      "es.nodes.wan.only" -> "true",
                                      "" -> "datasetid",
                                      "es.write.operation" -> "upsert")


This is returning the following error;

org.apache.spark.SparkException: Job aborted due to stage failure: Task 28 in stage 87.0 failed 4 times, most recent failure: Lost task 28.3 in stage 87.0 (TID 3943,, executor 0): java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
	at org.elasticsearch.spark.sql.DataFrameFieldExtractor.extractField(DataFrameFieldExtractor.scala:29)
	at org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:94)
	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:94)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.executor.Executor$
	at java.util.concurrent.ThreadPoolExecutor.runWorker(
	at java.util.concurrent.ThreadPoolExecutor$

How do I pass in the ID field of my DataFrame? If I take out and es.write.operation it writes without issue, BUT this causes ElasticSearch to generate the ID which makes me unable to update entries.

Any further information I can provide, please let me know and I'll get right on it.

(eliasah) #2

I'd definitely check that the version of the ES cluster, Spark cluster, JDK & Scala versions along with the dependencies that you are using in your project. They don't seem to be compatible.

(James Baiera) #3

Do you mean Scala 2.10 or 2.11? This error almost always indicates that you're using a 2.10 artifact with a 2.11 Scala distribution.

(Richard Morwood) #4

Silly me, version mismatch. I should have checked into that.
Definately running Scala 2.1, not 2.11

Now using elasticsearch-spark-20_2.10-5.5.1.jar and it's going smoothly

Thanks for the assist!

(system) #5

