Hello All,
I am running into an issue when trying to use the Spark Scala library to push data to an index within our Elasticsearch cluster. I have a DStream[(String, String)] with the first String being the '_id' I want the document to have and the value (the second String) being a JSONified version of the document.
I try to save the DStream using this line:
EsSparkStreaming.saveToEsWithMeta(transformed, "my-index/tmp", Map(ES_INPUT_JSON -> true.toString))
When I do this I get the following error:
17/01/06 16:26:19 ERROR JobScheduler: Error running job streaming job 1483745150000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Unexpected character ('d' (code 100)) in numeric value: Exponent indicator not followed by a digit
at [Source: [B@33014046; line: 1, column: 23]
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:185)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:182)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
This incorrect parsing results in a cascading set of failures all related to JSON parse errors.
After firing up Wireshark I was able to capture the data sent to the cluster and found this:
{"index":{"_id":4315ede3:dff5:3127:9d15:dc692d825cc8}}
{JSON DOCUMENT}
It looks like the library either isn't encoding the _id correctly (not passing as a JSON string) or I have a misunderstanding and the _id field is meant to be numeric.
Any help in troubleshooting this issue would be greatly appreciated!