Issue/Error while reading data from Elastic index with Spark including Custom schema

I am trying the read data from a given Elasticsearch index including a custom schema while reading.
The data is populated by another spark application in which the same schema is being used this has been done keeping in mind that union of Dataframe shouldn't be a problem.
I am reading the data in following way :-

var prev_data_elastic_df = spark.read.format("org.elasticsearch.spark.sql")
            .option("query", query)
            .option("pushdown", true)
            .option("es.read.metadata", true)
            .schema(getElasticIndexSchema)
            .option("es.read.field.as.array.include","correspondence_groups,active_group_ids")
            .option("es.field.read.empty.as.null", "no")
            .load("previous_aggregate_index/_doc")

The ElasticIndex Schema is defined as

def getElasticIndexSchema(): StructType = {

    StructType(Array(
      StructField("_id", IntegerType),
      StructField("speed", DoubleType),
      StructField("latitude", DoubleType),
      StructField("longitude", DoubleType),
      StructField("id", StringType),
      StructField("timestamp", StringType),
      StructField("is_dtc_code", BooleanType),
      StructField("vehicle_timestamp_unix", TimestampType),
      StructField("door_status_change_desc", getDoorStatusChangeSchema),
      StructField("engine_status_change_desc", getEngineStatusChangeSchema),
      StructField("object_data-rental_id", StringType),
      StructField("_metadata", MapType(StringType, StringType)),
    ))
  }

  def getDoorStatusChangeSchema(): StructType = {

    StructType(Array(
      StructField("previous_aid_status", StringType),
      StructField("current_aid_status", StringType),
      StructField("previous_timestamp", StringType),
      StructField("current_timestamp", StringType),
      StructField("duration_in_seconds", LongType),
      StructField("location", getLocationSchema())
    ))
  }

  def getEngineStatusChangeSchema(): StructType = {

    StructType(Array(
      StructField("previous_comply_status", StringType),
      StructField("current_comply_status", StringType),
      StructField("duration_in_seconds", LongType),
      StructField("previous_timestamp", StringType),
      StructField("current_timestamp", StringType),
      StructField("location", getLocationSchema())
    ))
  }

  def getLocationSchema(): StructType = {

    StructType(Array(
      StructField("lat", DoubleType),
      StructField("lon", DoubleType)
    ))
  }

I am still getting error like below :-

org.elasticsearch.spark.sql.ScalaEsRow is not a valid external type for schema of string

java.lang.String is not a valid external type for schema of string

Still unable to make out any particular reason for this . Only thing I could observe/note that the data when populated to Elastic not necessary all the fields have value some don't have any value and are null. So when indexed the fields are not present. Could that be a reason ??
Any solution to this ??
Any recommendation how can I deal with it ??

Hi @ashit_pupu. What does your data look like and what is the stack trace? Could you post some spark code that has data and mappings to reproduce this? If you want some examples of getting everything (code, schema, and data) all into a small bit of code you might want to look at AbstractScalaEsSparkSQL for examples (it's what I use when I'm trying to reproduce something).

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.