Reading mutilevel nested field in spark dataframe


(Shrikant) #1

I am trying to read a mutli-level nested field "affiliations.org.orgName" using spark connector in following index

            {"userId": 402885,
            "userUuid": "F2896A1F1A7D411A69",
            "firstName": "TRISHA",
            "lastName": "WEEMS",
            "age": 42,
            "gender": "FEMALE",
            "address": {
                "countryId": 232,
                "country": "US",
                "subdivisionId": 4074,
                "subdivision": "TN",
                "postalCode": null,
                "city": null
            },
            "categories": [
                66,
                68,
                75,
                91,
                92,
                113,
                120,
                121,
                122,
                158
            ],
            "affiliations": [
                {
                    "org": {
                        "orgName": "Dick's Sporting Goods",
                        "categories": [
                            65,
                            68,
                            85
                        ],
                    },
                    "team": {
                        "teamName": "Dick's Sporting Goods Retail Employees",
                    },
                    "expirationDate": null
                }
            ],
        }

using following

 val myDF = spark.read.format("org.elasticsearch.spark.sql")
      .schema(user_schema)
      .option("es.nodes", "localhost")
      .option("es.port", "9200")
      .option("es.field.read.empty.as.null","true")
      .option("es.index.read.missing.as.empty","true")
      .option("es.mapping.include", "categories, affiliations.*")
      .option("es.read.field.include", "userId, userUuid, age, firstName, categories, lastName, affiliations.org.orgName")
      .option("es.read.field.exclude", "affiliations.expirationDate, affiliations.team, affiliations.location")
      .option("es.read.field.as.array.include", "affiliations") //not working
      .load("dev21-targeting-users-1/user")

But I keep getting null for the field of org.orgName in myDF

+--------------------+-----------+
|            userUuid|org.orgName|
+--------------------+-----------+
|F2896A1F1A7D411C9...|       null|
|900FF5FE93D749CA9...|       null|
+--------------------+-----------+

My schema is

val user_schema = StructType(
        StructField("userUuid", StringType) ::
        StructField("org.orgName", ArrayType(StringType),true) ::Nil)

Note : When I query ES using postman , I get the correct result.


(James Baiera) #2

I'm not sure that you are specifying the schema correctly here. I am not sure I see an array named "org.orgName" in your provided data. There is an array named "affiliations", and inside that array there is a single object with a field named "org.orgName". Does the connector deserialize the data when you use "affiliations" instead of "org.orgName" ?


(Shrikant) #3

Hi James.
Yes "Affiliations" is array of nested type. Which contains org & team docs. But I don't want all the fields from "Afflilations.org" , I only need orgName and since affiliations is an Array , I will get many values for orgName hence is ArrayType(StringType) is used for org.orgName

Even if I decide to get everything from "Affiliations" and change my schema like below:

  val affilations_org_schema = StructType(
      StructField("categories", ArrayType(LongType)) ::
      StructField("orgName", StringType) ::
      Nil)  

  val affliations_team_schema = StructType(
      StructField("teamName", StringType) ::
      Nil)

val affliations_schema = StructType(
      StructField("org", affilations_org_schema) ::
      StructField("team", affliations_team_schema) ::     
      Nil)

val user_schema = StructType(
        StructField("userUuid", StringType) ::
        StructField("Affiliations", ArrayType(affliations_schema),true) ::Nil)

I get java null pointer exception

java.lang.NullPointerException
	at org.elasticsearch.spark.sql.ScalaEsRow.values$lzycompute(ScalaEsRow.scala:27)
	at org.elasticsearch.spark.sql.ScalaEsRow.values(ScalaEsRow.scala:27)
	at org.elasticsearch.spark.sql.ScalaEsRow.length(ScalaEsRow.scala:34)
	at org.apache.spark.sql.Row$class.size(Row.scala:130)
	at org.elasticsearch.spark.sql.ScalaEsRow.size(ScalaEsRow.scala:25)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:164)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:383)
	at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
	at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.