Spark to ES NoSuchMethodError - DataFrameFieldExtractor.extractField

Hello! I am trying to figure out how to upsert rows to ElasticSearch from Spark.
I am using elasticsearch-spark-20_2.10-5.5.1.jar, on a Spark cluster running Spark2.1 with Scala 2.1

My DataFrame printSchema() looks like this;

root
 |-- datasetid: integer (nullable = false)
 |-- name: string (nullable = false)

The code I am using to write the DataFrame looks like this;

var ElasticSearchHost = "http://url-to-elasticsearch.com"
var esConfig:Map[String,String] = Map("es.nodes" -> ElasticSearchHost,
                                      "es.port" -> "443",
                                      "es.nodes.wan.only" -> "true",
                                      "es.net.ssl" -> "true",
                                      "es.nodes.wan.only" -> "true",
                                      "es.mapping.id" -> "datasetid",
                                      "es.write.operation" -> "upsert")

dfSet1.write.format("org.elasticsearch.spark.sql")
        .options(esConfig)
        .save("indexofstuff/persons")

This is returning the following error;

org.apache.spark.SparkException: Job aborted due to stage failure: Task 28 in stage 87.0 failed 4 times, most recent failure: Lost task 28.3 in stage 87.0 (TID 3943, 10.164.225.89, executor 0): java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
	at org.elasticsearch.spark.sql.DataFrameFieldExtractor.extractField(DataFrameFieldExtractor.scala:29)
	at org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:36)
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:94)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:94)
	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:94)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

How do I pass in the ID field of my DataFrame? If I take out es.mapping.id and es.write.operation it writes without issue, BUT this causes ElasticSearch to generate the ID which makes me unable to update entries.

Any further information I can provide, please let me know and I'll get right on it.

I'd definitely check that the version of the ES cluster, Spark cluster, JDK & Scala versions along with the dependencies that you are using in your project. They don't seem to be compatible.

Do you mean Scala 2.10 or 2.11? This error almost always indicates that you're using a 2.10 artifact with a 2.11 Scala distribution.

Silly me, version mismatch. I should have checked into that.
Definately running Scala 2.1, not 2.11

Now using elasticsearch-spark-20_2.10-5.5.1.jar and it's going smoothly

Thanks for the assist!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.