ES-hadoop serialize org.apache.hadoop.io.ShortWritable failed

Hi

I am using pyspark with es-hadoop process es data

ES 7.4.0

spark 2.3.1

PUT test
{
    "mappings": {
        "properties": {
            "price": {
                "type": "short"
            }
        }
    }
}

PUT test/_doc/1
{
    "price": 1
}

pyspark --driver-class-path ~/jars/elasticsearch-hadoop-7.4.0.jar --jars ~/jars/elasticsearch-hadoop-7.4.0.jar

conf = {
    'es.resource': 'test',
    "es.nodes.wan.only": "true",
    "es.nodes": 'http://localhost:9200',
    "es.port": '9200',
    'es.net.http.auth.user': '',
    "es.net.http.auth.pass": '',
}
rdd = sc.newAPIHadoopRDD(inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
                         keyClass="org.apache.hadoop.io.NullWritable",
                         valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
                         conf=conf)
"""
ERROR:
Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.apache.hadoop.io.ShortWritable
Serialization stack:
  - object not serializable (class: org.apache.hadoop.io.ShortWritable, value: 1)
  - writeObject data (class: java.util.HashMap)
  - object (class java.util.HashMap, {price=1})
  - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
  - object (class scala.Tuple2, (1,{price=1}))
  - element of array (index: 0)
  - array (class [Lscala.Tuple2;, size 1); not retrying
Traceback (most recent call last):
"""

When i change short to long, i got the correct es data, why short type not serializable?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.

Spark requires that all objects that are shared across machines implement Java's Serializable interface. Hadoop uses a different mechanism for passing the data between machines -- its own Writable interface. Most Writable objects are not Serializable. For example ShortWritable and LongWritable are used by hadoop and are not Serializable. The pyspark code makes an attempt to convert all of the Writables to ordinary java objects (which are Serializable). The reason that it works for longs is that it explicitly checks for LongWritables:
spark/PythonHadoopUtil.scala at v2.3.1 · apache/spark · GitHub. Unfortunately there is no check for ShortWritables, so it just passes along that Writable, causing the error you see (spark/PythonHadoopUtil.scala at v2.3.1 · apache/spark · GitHub). It doesn't look like this has changed in more recent versions of spark unfortunately. So until that is fixed in spark, I think your options are: (1) Use a long in your mapping if that's possible, (2) use the dataframe API from pyspark (which has a lot of advantages over the RDD API anyway), or (3) switch to java or scala.

I created a pull request to fix this at Adding support for ShortWriables to pyspark's newAPIHadoopRDD method by masseyke · Pull Request #34838 · apache/spark · GitHub. It works in my testing.

This pull request has been merged into spark master now. The fix will be in spark 3.3.0. See [SPARK-37598] Pyspark's newAPIHadoopRDD() method fails with ShortWritables - ASF JIRA for more details.

1 Like