Spark to ES NoSuchMethodError - DataFrameFieldExtractor.extractField


(Richard Morwood) #1

Hello! I am trying to figure out how to upsert rows to ElasticSearch from Spark.
I am using elasticsearch-spark-20_2.10-5.5.1.jar, on a Spark cluster running Spark2.1 with Scala 2.1

My DataFrame printSchema() looks like this;

root
 |-- datasetid: integer (nullable = false)
 |-- name: string (nullable = false)

The code I am using to write the DataFrame looks like this;

var ElasticSearchHost = "http://url-to-elasticsearch.com"
var esConfig:Map[String,String] = Map("es.nodes" -> ElasticSearchHost,
                                      "es.port" -> "443",
                                      "es.nodes.wan.only" -> "true",
                                      "es.net.ssl" -> "true",
                                      "es.nodes.wan.only" -> "true",
                                      "es.mapping.id" -> "datasetid",
                                      "es.write.operation" -> "upsert")

dfSet1.write.format("org.elasticsearch.spark.sql")
        .options(esConfig)
        .save("indexofstuff/persons")

This is returning the following error;

org.apache.spark.SparkException: Job aborted due to stage failure: Task 28 in stage 87.0 failed 4 times, most recent failure: Lost task 28.3 in stage 87.0 (TID 3943, 10.164.225.89, executor 0): java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
	at org.elasticsearch.spark.sql.DataFrameFieldExtractor.extractField(DataFrameFieldExtractor.scala:29)
	at org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:36)
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:94)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:94)
	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:94)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

How do I pass in the ID field of my DataFrame? If I take out es.mapping.id and es.write.operation it writes without issue, BUT this causes ElasticSearch to generate the ID which makes me unable to update entries.

Any further information I can provide, please let me know and I'll get right on it.


(eliasah) #2

I'd definitely check that the version of the ES cluster, Spark cluster, JDK & Scala versions along with the dependencies that you are using in your project. They don't seem to be compatible.


(James Baiera) #3

Do you mean Scala 2.10 or 2.11? This error almost always indicates that you're using a 2.10 artifact with a 2.11 Scala distribution.


(Richard Morwood) #4

Silly me, version mismatch. I should have checked into that.
Definately running Scala 2.1, not 2.11

Now using elasticsearch-spark-20_2.10-5.5.1.jar and it's going smoothly

Thanks for the assist!


(system) #5

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.