I have a simple Spark data frame which contains a column of JSON strings. When I run the following code:
import org.elasticsearch.spark._
val df = Seq("""{"name": "john", "age":44}""").toDF("json")
df.rdd.map(x=>x.getAs[String]("json")).saveToEs("test-index-df/post")
I get the error below:
17/08/13 21:51:58 ERROR TaskContextImpl: Error in TaskCompletionListener
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [100.127.0.5:9200] returned Bad Request(400) - failed to parse; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267)
at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:120)
at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply(EsRDDWriter.scala:60)
at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply(EsRDDWriter.scala:60)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:123)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:97)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:95)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
Anyone have any idea what I am doing wrong?