Hello! I am trying to figure out how to upsert rows to ElasticSearch from Spark.
I am using elasticsearch-spark-20_2.10-5.5.1.jar, on a Spark cluster running Spark2.1 with Scala 2.1
My DataFrame printSchema() looks like this;
root
|-- datasetid: integer (nullable = false)
|-- name: string (nullable = false)
The code I am using to write the DataFrame looks like this;
var ElasticSearchHost = "http://url-to-elasticsearch.com"
var esConfig:Map[String,String] = Map("es.nodes" -> ElasticSearchHost,
"es.port" -> "443",
"es.nodes.wan.only" -> "true",
"es.net.ssl" -> "true",
"es.nodes.wan.only" -> "true",
"es.mapping.id" -> "datasetid",
"es.write.operation" -> "upsert")
dfSet1.write.format("org.elasticsearch.spark.sql")
.options(esConfig)
.save("indexofstuff/persons")
This is returning the following error;
org.apache.spark.SparkException: Job aborted due to stage failure: Task 28 in stage 87.0 failed 4 times, most recent failure: Lost task 28.3 in stage 87.0 (TID 3943, 10.164.225.89, executor 0): java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at org.elasticsearch.spark.sql.DataFrameFieldExtractor.extractField(DataFrameFieldExtractor.scala:29)
at org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor.field(ConstantFieldExtractor.java:36)
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:94)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:94)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:94)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
How do I pass in the ID field of my DataFrame? If I take out es.mapping.id and es.write.operation it writes without issue, BUT this causes ElasticSearch to generate the ID which makes me unable to update entries.
Any further information I can provide, please let me know and I'll get right on it.