Can not insert into a geo_shape field uing EsSpark


(Tarek Izemrane) #1

Hello !

I am trying to insert a rdd using EsSpark in elasticsearch with a geo_shape type field, but I get the following exception:

(Spark 2.1.0, Elasticsearch 5.1.2, Scala 2.11.8)

[details=Exception]17/05/15 18:08:50 INFO EsRDDWriter: Writing to [test/frcommune]
17/05/15 18:08:51 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - failed to parse [location.coordinates]; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:250)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:202)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:196)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
17/05/15 18:08:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [/details]

Here is the index creation script

[details=Index creation & mapping]curl -XPUT http://localhost:9200/test_v2 -d '{"settings" : { "index" : { "number_of_shards" : 1, "number_of_replicas" : 1 }}}'

curl -XPUT http://localhost:9200/test_v2/_mapping/frcommune -d '{"frcommune":{"properties":{"code" :{"type":"string","index":"not_analyzed"},"name":{"type":"string","index":"not_analyzed"},"location": {"type": "geo_shape"}}}}'[/details]

Here is the Scala code

val features = spark.sparkContext.parallelize(geojson.parseJson.convertTo[FeatureCollection])

val communes = features.map { f => (Map(ID -> f("code").toString()),Map("code" -> f("code").toString(),"name" -> f("nom").toString(),"location" -> f.geometry.toJson.convertTo[Map[String,String]])) }

communes.cache()

communes.collect().foreach(println)

EsSpark.saveToEsWithMeta(communes, "test/frcommune")

Here is the output of the println

[details=Println](Map(ID -> "89062"),Map(code -> "89062", name -> "Carisey", location -> Map(type -> "Polygon", coordinates -> [[[3.8399523957353,47.905263951005],[3.823557291192,47.909701028458],[3.8170534286331,47.916349809515],[3.8134456073723,47.926064276035],[3.8196556915806,47.930977407181],[3.838024293864,47.932700023793],[3.8553336755893,47.937761088151],[3.8639075161289,47.937963865241],[3.8729140877996,47.933528695327],[3.8735411899523,47.924030776409],[3.8714049601457,47.917312769629],[3.8626367387543,47.91252552914],[3.8399523957353,47.905263951005]]])))
[/details]

I also tried to save a Json with EsSpark.saveJsonToEs

[details=Exception]17/05/15 18:29:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [127.0.0.1:9200] returned Bad Request(400) - object mapping for [location] tried to parse field [location] as object, but found a concrete value; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:250)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:202)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:196)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
17/05/15 18:29:04 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [/details]

with this Output on println

Println

{"code":""92072"","name":""Sèvres"","location":"{"type":"Polygon","coordinates":[[[2.2055079612116,48.809200070282],[2.1970479300107,48.819085531359],[2.1897735373152,48.815759946803],[2.1785222082775,48.81485662027],[2.1963481868208,48.821981384064],[2.1994332651192,48.829785415549],[2.2211803346028,48.827865817554],[2.2240496902355,48.835149697724],[2.232638353852,48.822737823569],[2.2253321048442,48.823021690884],[2.2203156949819,48.815545696912],[2.2055079612116,48.809200070282]]]}"}

Any help Please :slight_smile:


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.