Pushing Data to Elasticsearch from Spark


#1

Hello All,

I am running into an issue when trying to use the Spark Scala library to push data to an index within our Elasticsearch cluster. I have a DStream[(String, String)] with the first String being the '_id' I want the document to have and the value (the second String) being a JSONified version of the document.

I try to save the DStream using this line:

EsSparkStreaming.saveToEsWithMeta(transformed, "my-index/tmp", Map(ES_INPUT_JSON -> true.toString))

When I do this I get the following error:

17/01/06 16:26:19 ERROR JobScheduler: Error running job streaming job 1483745150000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Unexpected character ('d' (code 100)) in numeric value: Exponent indicator not followed by a digit
at [Source: [B@33014046; line: 1, column: 23]
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:185)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:182)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

This incorrect parsing results in a cascading set of failures all related to JSON parse errors.

After firing up Wireshark I was able to capture the data sent to the cluster and found this:

{"index":{"_id":4315ede3:dff5:3127:9d15:dc692d825cc8}}
{JSON DOCUMENT}

It looks like the library either isn't encoding the _id correctly (not passing as a JSON string) or I have a misunderstanding and the _id field is meant to be numeric.

Any help in troubleshooting this issue would be greatly appreciated!


(Ed) #2

well the ID is not quoated so in JSON terms it would have to be an integer. but an ID should be quoted

https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html

beyond that is the limit of my knowledge of what your doing and from what I see


#3

Yeah I figured that the JSON was being improperly encoded as a string but I am not sure why this is happening.


(Ed) #4

you probably want to go open an issue on Github for the library, or maybe someone else will have an idea.


#5

That came to my mind yesterday after I found a workaround since it seems like a bug.

For anyone that has this same issue just make quotation marks (") as part of the string itself.

E.G
Java
Sting s = ""THIS IS MY ID"";

Scala
val id = s""""This is my id""""


#6

Bug report has been created.


(Ed) #7

Well glad you found A work around and opened a bug report on it. It always helps everyone.


(system) #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.