Define custom ID to a document with saveJsonToES()

I am trying to write a collection of objects in Elasticsearch from Spark. I have to meet two requirements:

  1. Document is already serialized in JSON and should be written as is
  2. Elasticsearch document _id should be provided

Here's what I tried so far.

saveJsonToEs()

I tried to use saveJsonToEs() like this (the serialized document contains field _id with desired Elasticsearch ID):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id"),
  ("es.mapping.exclude", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)

But the elasticsearch-hadoop library gives this exception:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
	at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
	at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)

When I remove es.mapping.id and es.mapping.exclude from the configuration, it works but the document id is generated by Elasticsearch (which violates requirement 2):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveJsonToEs(rdd, cfg)

saveToEsWithMeta()

There is another function to provide _id and other metadata for inserting: saveToEsWithMeta() that allows to solve requirement 2 but fails with requirement 1.

val rdd: RDD[(String, String)] = job.map{
  r => r._id -> r.toJson()
}

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveToEsWithMeta(rdd, cfg)

In fact, Elasticsearch is not even able to parse what the elasticsearch-hadoop sends:

Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
	at org.apache.spark.scheduler.Task.run(Task.scala:112)

The question

Is it possible to write a collection of (documentID, serializedDocument) from Spark into Elasticsearch (using elasticsearch-hadoop)?

Thanks!

P.S. I am using Elasticsearch 5.6.3 and Spark 2.1.1.

This is a repost from https://stackoverflow.com/questions/47892705/elasticsearch-spark-write-json-with-custom-document-id

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.