Spark SQL types are not handled through basic RDD saveToEs() calls

Given the following Dataset

aDF
res33: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [device_id: string]

aDF.rdd
res34: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[129] at rdd at <console>:67

aDF.show()

+-------------------------------------------+                                   
|device_id                                  |
+-------------------------------------------+
|g.40                                       |
|a.E2                                       |
|a.39                                       |

I would expect the ability to save it to ES with the following command.

EsSpark.saveToEs(aDF.rdd, "production-devices-spark-shell/device")

16/11/18 15:28:22 WARN TaskSetManager: Lost task 0.0 in stage 86.0 (TID 3595, ip-10-1-181-210.aws.vrv): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Spark SQL types are not handled through basic RDD saveToEs() calls; typically this is a mistake(as the SQL schema will be ignored). Use 'org.elasticsearch.spark.sql' package instead
	at org.elasticsearch.spark.serialization.ScalaValueWriter.org$elasticsearch$spark$serialization$ScalaValueWriter$$doWrite(ScalaValueWriter.scala:111)
	at org.elasticsearch.spark.serialization.ScalaValueWriter.write(ScalaValueWriter.scala:37)
	at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:53)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58)
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

The error message says Use 'org.elasticsearch.spark.sql' package instead but what exactly does that mean?

We provide native support for Spark SQL DataFrames and Datasets in ES-Hadoop. These often require some different logic to handle the serialization correctly. Please take a look at our documentation section about SparkSQL for the correct API calls.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.