I am trying the read data from a given Elasticsearch index including a custom schema while reading.
The data is populated by another spark application in which the same schema is being used this has been done keeping in mind that union of Dataframe shouldn't be a problem.
I am reading the data in following way :-
var prev_data_elastic_df = spark.read.format("org.elasticsearch.spark.sql")
.option("query", query)
.option("pushdown", true)
.option("es.read.metadata", true)
.schema(getElasticIndexSchema)
.option("es.read.field.as.array.include","correspondence_groups,active_group_ids")
.option("es.field.read.empty.as.null", "no")
.load("previous_aggregate_index/_doc")
The ElasticIndex Schema is defined as
def getElasticIndexSchema(): StructType = {
StructType(Array(
StructField("_id", IntegerType),
StructField("speed", DoubleType),
StructField("latitude", DoubleType),
StructField("longitude", DoubleType),
StructField("id", StringType),
StructField("timestamp", StringType),
StructField("is_dtc_code", BooleanType),
StructField("vehicle_timestamp_unix", TimestampType),
StructField("door_status_change_desc", getDoorStatusChangeSchema),
StructField("engine_status_change_desc", getEngineStatusChangeSchema),
StructField("object_data-rental_id", StringType),
StructField("_metadata", MapType(StringType, StringType)),
))
}
def getDoorStatusChangeSchema(): StructType = {
StructType(Array(
StructField("previous_aid_status", StringType),
StructField("current_aid_status", StringType),
StructField("previous_timestamp", StringType),
StructField("current_timestamp", StringType),
StructField("duration_in_seconds", LongType),
StructField("location", getLocationSchema())
))
}
def getEngineStatusChangeSchema(): StructType = {
StructType(Array(
StructField("previous_comply_status", StringType),
StructField("current_comply_status", StringType),
StructField("duration_in_seconds", LongType),
StructField("previous_timestamp", StringType),
StructField("current_timestamp", StringType),
StructField("location", getLocationSchema())
))
}
def getLocationSchema(): StructType = {
StructType(Array(
StructField("lat", DoubleType),
StructField("lon", DoubleType)
))
}
I am still getting error like below :-
org.elasticsearch.spark.sql.ScalaEsRow is not a valid external type for schema of string
java.lang.String is not a valid external type for schema of string
Still unable to make out any particular reason for this . Only thing I could observe/note that the data when populated to Elastic not necessary all the fields have value some don't have any value and are null. So when indexed the fields are not present. Could that be a reason ??
Any solution to this ??
Any recommendation how can I deal with it ??