I have the following input json data with 'user' as a nested object:
{"id":"1","user":[{"first":"John","last":"Smith","amount":100.0}]}
{"id":"2","user":[{"first":"Jane","last":"Hay","amount":50.0}]}
Before indexing this data, I am creating an index called 'my_index' (using curl) with the following mapping:
{
"mappings": {
"my_type": {
"properties": {
"id": {
"type": "keyword"
},
"user": {
"type": "nested",
"properties": {
"first": {
"type": "keyword"
},
"last": {
"type": "keyword"
},
"amount": {
"type": "float"
}
}
}
}
}
}
}'
I am then using ES-Hadoop 6.X jar (ES cluster version=6.2.2) with Pyspark 1.6.0 to index documents using the following code:
from pyspark import SparkContext, SparkConf, sql
from pyspark.sql import HiveContext, Row, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
conf = SparkConf().setAppName('temp1')
es_nodes="xx.xxx.x.xxx"
es_port="9200"
es_user="user"
es_pass="pass"
hdfs_input_dir="/path/to/input"
## Set configuration for ES indexing
es_write_conf = {
"es.nodes": es_nodes,
"es.port": es_port,
"es.resource": "my_index/my_type",
"es.net.http.auth.user": es_user,
"es.net.http.auth.pass": es_pass,
"es.mapping.id": "id",
"es.input.json": "true",
"es.nodes.wan.only": "true",
"es.write.operation": "index"
}
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
df=sqlContext.read.json(hdfs_input_dir)
df.printSchema()
## Convert df to rdd containing tuples in the format: (es_doc_id, doc)
rdd_to_index = df.rdd.map(lambda row: row.asDict()).map(
lambda x: (x['id'], json.dumps(x)))
## Index input data
rdd_to_index.saveAsNewAPIHadoopFile(path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
However, I run into an exception wrt to the nested object 'user'. The exception is - object mapping for [user] tried to parse field [null] as object, but found a concrete value; Bailing out..
More details on the exception:
Driver Stacktrace:
18/03/30 14:43:47 INFO scheduler.DAGScheduler: Job 7 failed: saveAsNewAPIHadoopFile at PythonRDD.scala:782, took 12.767684 s
Traceback (most recent call last):
File "/home/user/test-indexing.py", line 44, in <module>
conf=es_write_conf)
File "/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1374, in saveAsNewAPIHadoopFile
File "/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 15, hostname.domain.com, executor 5): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [xx.xxx.xx.xxx:9200] returned Bad Request(400) - object mapping for [user] tried to parse field [null] as object, but found a concrete value; Bailing out..
at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251)
at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203)
at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:248)
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:270)
at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:295)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1297)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Tried with another input dataset which also contained a nested object and ended up with the same exception.
This leads me to believe that this may be an issue with ES-Hadoop 6.x jar itself? Note - I have also tried my luck with all the major and minor versions of the ES-Hadoop 6.x.x jars - all with no luck.
Any help/thoughts on this is greatly appreciated..Thanks!