Cannot index nested documents with ES-Hadoop 6.x.x jar

I have the following input json data with 'user' as a nested object:

{"id":"1","user":[{"first":"John","last":"Smith","amount":100.0}]}
{"id":"2","user":[{"first":"Jane","last":"Hay","amount":50.0}]}

Before indexing this data, I am creating an index called 'my_index' (using curl) with the following mapping:

{
	"mappings": {
		"my_type": {
			"properties": {
				"id": {
					"type": "keyword"
				},
				"user": {
					"type": "nested",
					"properties": {
						"first": {
							"type": "keyword"
						},
						"last": {
							"type": "keyword"
						},
						"amount": {
							"type": "float"
						}
					}
				}
			}
		}
	}
}'

I am then using ES-Hadoop 6.X jar (ES cluster version=6.2.2) with Pyspark 1.6.0 to index documents using the following code:

from pyspark import SparkContext, SparkConf, sql
from pyspark.sql import HiveContext, Row, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

conf = SparkConf().setAppName('temp1')

es_nodes="xx.xxx.x.xxx"
es_port="9200"
es_user="user"
es_pass="pass"

hdfs_input_dir="/path/to/input"

## Set configuration for ES indexing
es_write_conf = {
    "es.nodes": es_nodes,
    "es.port": es_port,
    "es.resource": "my_index/my_type",
    "es.net.http.auth.user": es_user,
    "es.net.http.auth.pass": es_pass,
    "es.mapping.id": "id",
    "es.input.json": "true",
    "es.nodes.wan.only": "true",
    "es.write.operation": "index"
}

sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

df=sqlContext.read.json(hdfs_input_dir)
df.printSchema()

## Convert df to rdd containing tuples in the format: (es_doc_id, doc)
rdd_to_index = df.rdd.map(lambda row: row.asDict()).map(
    lambda x: (x['id'], json.dumps(x)))

## Index input data
rdd_to_index.saveAsNewAPIHadoopFile(path='-',
                                        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
                                        keyClass="org.apache.hadoop.io.NullWritable",
                                        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
                                        conf=es_write_conf)

However, I run into an exception wrt to the nested object 'user'. The exception is - object mapping for [user] tried to parse field [null] as object, but found a concrete value; Bailing out..

More details on the exception:

Driver Stacktrace:
18/03/30 14:43:47 INFO scheduler.DAGScheduler: Job 7 failed: saveAsNewAPIHadoopFile at PythonRDD.scala:782, took 12.767684 s
Traceback (most recent call last):
  File "/home/user/test-indexing.py", line 44, in <module>
    conf=es_write_conf)
  File "/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1374, in saveAsNewAPIHadoopFile
  File "/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
  File "/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 15, hostname.domain.com, executor 5): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [xx.xxx.xx.xxx:9200] returned Bad Request(400) - object mapping for [user] tried to parse field [null] as object, but found a concrete value; Bailing out..
	at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251)
	at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203)
	at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:248)
	at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:270)
	at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:295)
	at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214)
	at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1297)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Tried with another input dataset which also contained a nested object and ended up with the same exception.

This leads me to believe that this may be an issue with ES-Hadoop 6.x jar itself? Note - I have also tried my luck with all the major and minor versions of the ES-Hadoop 6.x.x jars - all with no luck.

Any help/thoughts on this is greatly appreciated..Thanks!

can you print the rdd_to_index ? I don't use spark to directly index my dataframe. What I usually do is use bulk processor to index documents in batches.

bulkRequest.add(client.prepareIndex("geo_data_test", "data_test")
.setSource(jsonBuilder()
.startObject()
.field("type",new JSONObject(s.toString()).get("type"))

                            .startObject("proper")
                            .field("gid",new JSONObject(proper.toString()).get("gid").toString())
                            .field("objectid",new JSONObject(proper.toString()).get("objectid"))
                            .field("nombre_del",new JSONObject(proper.toString()).get("nombre_del"))
                            .field("tipo_de_si",new JSONObject(proper.toString()).get("tipo_de_si"))
                            .field("codigo_de_",new JSONObject(proper.toString()).get("codigo_de_"))
                            .endObject()
                            .endObject()
                    )
            );

This is what I do.Here geometry and proper is a nested field
Hope it helps.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.