Cannot extract value from entity

I've had this script running for some time but it suddenly started throwing an error. The error message looks like my device_id is in a nested array in my table but after inspecting the table it is not.

def esConfig(): Map[String,String] = {
  Map("es.mapping.id" -> "device_id")
}

My Table

aDF.show()
+-------------------------------------------+                                   
|device_id                                  |
+-------------------------------------------+
|g.40     |
|a.E2     |

Running saveToEs throws this error [ScalaMapFieldExtractor for field [[device_id]]] cannot extract value from entity [class java.lang.String]

EsSpark.saveToEs(aDF.rdd, "production-devices-spark-shell/device", esConfig)

16/11/18 15:20:06 WARN TaskSetManager: Lost task 10.0 in stage 79.0 (TID 3312, ip-10-1-181-210.aws.vrv): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [ScalaMapFieldExtractor for field [[device_id]]] cannot extract value from entity [class java.lang.String] | instance [[ds.289d90c249cec7becf3cfde39f0bbe028fcf4ce4]]
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:97)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

This issue was resolved by mapping my data set to a Device case class. My script had this operation further up the lineage chain and it appears it must be the last step before handing off the rdd to EsSpark.

case class Device(device_id: String)

val esDF = audienceDF.map {
  case Row(device_id: String) => Device(device_id)
}
EsSpark.saveToEs(esDF.rdd, "devices-spark-shell/device", esConfig)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.