I am trying to write a collection of objects in Elasticsearch from Spark. I have to meet two requirements:
- Document is already serialized in JSON and should be written as is
- Elasticsearch document
_id
should be provided
Here's what I tried so far.
saveJsonToEs()
I tried to use saveJsonToEs()
like this (the serialized document contains field _id
with desired Elasticsearch ID):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id"),
("es.mapping.exclude", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
But the elasticsearch-hadoop
library gives this exception:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
When I remove es.mapping.id
and es.mapping.exclude
from the configuration, it works but the document id is generated by Elasticsearch (which violates requirement 2):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveJsonToEs(rdd, cfg)
saveToEsWithMeta()
There is another function to provide _id
and other metadata for inserting: saveToEsWithMeta()
that allows to solve requirement 2 but fails with requirement 1.
val rdd: RDD[(String, String)] = job.map{
r => r._id -> r.toJson()
}
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveToEsWithMeta(rdd, cfg)
In fact, Elasticsearch is not even able to parse what the elasticsearch-hadoop
sends:
Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
The question
Is it possible to write a collection of (documentID, serializedDocument)
from Spark into Elasticsearch (using elasticsearch-hadoop
)?
Thanks!
P.S. I am using Elasticsearch 5.6.3 and Spark 2.1.1.
This is a repost from https://stackoverflow.com/questions/47892705/elasticsearch-spark-write-json-with-custom-document-id