Pushing Data to Elasticsearch from Spark

Hello All,

I am running into an issue when trying to use the Spark Scala library to push data to an index within our Elasticsearch cluster. I have a DStream[(String, String)] with the first String being the '_id' I want the document to have and the value (the second String) being a JSONified version of the document.

I try to save the DStream using this line:

EsSparkStreaming.saveToEsWithMeta(transformed, "my-index/tmp", Map(ES_INPUT_JSON -> true.toString))

When I do this I get the following error:

17/01/06 16:26:19 ERROR JobScheduler: Error running job streaming job 1483745150000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Unexpected character ('d' (code 100)) in numeric value: Exponent indicator not followed by a digit
at [Source: [B@33014046; line: 1, column: 23]
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:185)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:182)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

This incorrect parsing results in a cascading set of failures all related to JSON parse errors.

After firing up Wireshark I was able to capture the data sent to the cluster and found this:

{"index":{"_id":4315ede3:dff5:3127:9d15:dc692d825cc8}}
{JSON DOCUMENT}

It looks like the library either isn't encoding the _id correctly (not passing as a JSON string) or I have a misunderstanding and the _id field is meant to be numeric.

Any help in troubleshooting this issue would be greatly appreciated!

well the ID is not quoated so in JSON terms it would have to be an integer. but an ID should be quoted

https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html

beyond that is the limit of my knowledge of what your doing and from what I see

Yeah I figured that the JSON was being improperly encoded as a string but I am not sure why this is happening.

you probably want to go open an issue on Github for the library, or maybe someone else will have an idea.

That came to my mind yesterday after I found a workaround since it seems like a bug.

For anyone that has this same issue just make quotation marks (") as part of the string itself.

E.G
Java
Sting s = ""THIS IS MY ID"";

Scala
val id = s""""This is my id""""

Bug report has been created.

Well glad you found A work around and opened a bug report on it. It always helps everyone.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.