Getting a "RDD element of type java.util.HashMap cannot be used" error

I am using pyspark and I have an RDD of complex JSON strings that I converted to JSON using python's json.loads. When I try to save this to elasticsearch using rdd.saveAsNewAPIHadoopFile I get a "RDD element of type java.util.HashMap cannot be used" error.

Here is the code snipped:

jsonRdd = jsonStringRdd.map(lambda x : json.loads(x))
jsoRdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={ "es.resource" : "test-index-rdd/posts" ,

Here is the error:

Traceback (most recent call last):
File "", line 7, in
File "/opt/spark/python/pyspark/rdd.py", line 1421, in saveAsNewAPIHadoopFile
keyConverter, valueConverter, jconf)
File "/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: RDD element of type java.util.HashMap cannot be used
at org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:238)
at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:827)
at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)

Can anyone help me to solve this issue?

You may have to convert the hash map into a map writable object to use the NewAPIHadoop calls, since all interaction with the connector through that is handled by the map reduce code, which only works with writable objects.

Is there no easier way to do this? Can I somehow pass the string to ES and have it treat it as JSON and properly index it? I tried the following:

jsonStringRdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={ "es.resource" : "test-index-rdd/posts" ,
"es.input.json": "true"
})

but I got this error:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used

Tried a much simpler example and got the same error.

j = [{'a':1},{'b':2}]
rdd = sc.parallelize(j)
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={ "es.resource" : "test-index-rdd/posts" ,
"es.input.json": "true"
})

Traceback (most recent call last):
File "", line 7, in
File "/opt/spark/python/pyspark/rdd.py", line 1421, in saveAsNewAPIHadoopFile
keyConverter, valueConverter, jconf)
File "/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: RDD element of type java.util.HashMap cannot be used
at org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:238)
at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:827)
at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)

In the case of using JSON strings, you would need to use a Text object instead of a MapWritable. The output format will only work with data objects that implement the Hadoop Writable contract. To use String and Map objects you will need to use the more extensive native support available in Scala and Java.

So there is no pyspark equivalent to rdd.saveJsonToEs(...) ?

Unfortunately, not at this time. You may be able to tap into the native support by using the Spark SQL functionality with PySpark, and specifying an Elasticsearch datasource (as described later in our documentation about PySpark).

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.