Pyspark - read nested Object field from elasticsearch

I am trying to read nested data from an Object field in elasticsearch index.

The field mapping I'm trying to read looks like this:

   "field": {
                    "type": "object",
                    "enabled": false
                }

Setting schema and initializing DF this way:

    field_shcema =  StructType(
                        [
                            StructField("f1", ArrayType(StringType())),
                            StructField("f2", ArrayType(FloatType())),
                            StructField("f3", ArrayType(StringType()))
                        ]
                    )
        
    schema = StructType(
        [
            StructField("id", StringType()),
            StructField("field", ArrayType(field_shcema))           
        ]
    )

    # spark elasticsearch reader
    reader = (
        spark.read.format("org.elasticsearch.spark.sql")
            .option("es.nodes.wan.only", "true")
            .option("es.read.field.as.array.include", " field,field.f1, field.f2, field.f3")
            .option("es.nodes", es_node)
            .option("es.port", es_port)
            
    )

However, doing it this way results in:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage 26.0 (TID 26, localhost, executor driver): java.lang.NullPointerException
    	at org.elasticsearch.spark.sql.ScalaEsRow.values$lzycompute(ScalaEsRow.scala:27)
    	at org.elasticsearch.spark.sql.ScalaEsRow.values(ScalaEsRow.scala:27)
    	at org.elasticsearch.spark.sql.ScalaEsRow.length(ScalaEsRow.scala:34)
    	at org.apache.spark.sql.Row$class.size(Row.scala:130)
    	at org.elasticsearch.spark.sql.ScalaEsRow.size(ScalaEsRow.scala:25)
    	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:245)
    	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
    	at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    	at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)

Must the nested fields be declared?

elasticsearch version is 5.4
Please advise

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.