scala.MatchError: null - ElasticSearch Nested mapping - Migrate from ES 5.6.3 to 6.8.3

Hi All,

Need some help.

I am using PySpark ( Spark 2.2.0, Scala 2.11.8, AWS EMR emr-5.8.0) and trying to migrate from Elasticsearch 5.6.3 to 6.8.3.

The following jar i have been using so far:
elasticsearch-spark-20_2.11-5.6.3

For migration i am using the following jar and testing the PySpark Jobs:
elasticsearch-spark-20_2.11-6.8.3

I have ES Mapping as shown below:

"availability": {
"properties": {
"current": {
"properties": {
"lastUpdateTm": { "type": "long" },
"size": { "type": "keyword", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } },
"value": { "type": "long" }
}
},
"previous": {
"properties": {
"lastUpdateTm": { "type": "long" },
"size": { "type": "keyword", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } },
"value": { "type": "long" }
}
}
}
}

PySpark Job 1:

a) I have defined schema as shown below in PySpark Job:

nestedSchema = StructType([
StructField('current',ArrayType(
StructType([
StructField('lastUpdateTm',LongType(),nullable=True),
StructField('size',StringType(),nullable=True),
StructField('value',LongType(),nullable=True)
])
),nullable=True),
StructField('previous',ArrayType(
StructType([
StructField('lastUpdateTm',LongType(),nullable=True),
StructField('size',StringType(),nullable=True),
StructField('value',LongType(),nullable=True)
])
),nullable=True)
])

b) Write to ES:

udfCreateObject = udf(lambda x,y:[x,y],nestedSchema)

dfOther = dfSource.groupBy('id').agg(collect_list('current').alias('aggrCurrent'),collect_list('previous').alias('aggrPrevious'))

dfTarget = dfSource.withColumn('active',lit(True))
.withColumn('hashKey',concat_ws("-","key-column1","key-column2")) \
.withColumn("availability",udfCreateObject(dfOther.aggrCurrent,dfOther.aggrPrevious))

writeToES(dataToWrite=dfTarget, index="test-index", docType="test-doc-type", mappingID='hashKey') .save()

c) Sample ES document:

{

"_index": "test-index",
"_type": "test-doc-type",
"_source": {
				"hashKey" : "key1",
				"active": true,
				"availability": {
					"current": null,
					"previous": null
				}
			}

},
{

"_index": "test-index",
"_type": "test-doc-type",
"_source": {		
		"hashKey" : "key2",
		"active": true,
		"availability": {
		"current": [
                        {
                            "lastUpdateTm": 1571702400000,
                            "size": "10",
                            "value": 100
                        },
                     ...
                    ],
                    "previous": [
                        {
                            "lastUpdateTm": 1571702400000,
                            "size": "30",
                            "value": 200
                        },
                      ...
                    ]
	}
}

}

PySpark Job 2:

I have another PySpark job which reads Elasticsearch from the above index "test-index", doc type "test-doc-type" and writes it back with updated values.

a) dfSource = readFromES(index="test-index", docType="test-doc-type") .option("es.read.field.as.array.include", availability.current,availability.previous") .load()

b) dfTarget = "Target data frame after transformations"

c) Please follow two rows after printing the above dataframe
+---------+-----------------------------------------------------------------------------------------------------------------------------------+
|hashKey |availability |
+---------+-----------------------------------------------------------------------------------------------------------------------------------+
|key1 |[WrappedArray(null),WrappedArray(null)] |
|key2 |[WrappedArray([1571702400000,10,100], [1571702400000,20,1000]),WrappedArray([1571702400000,30,200], [1571702400000,40,1500])]|
+---------+-----------------------------------------------------------------------------------------------------------------------------------+
d) writeToES(dataToWrite=dfTarget, index="test-index", docType="test-doc-type", mappingID='hashKey').save()

e) Above save() is throwing the following error:

WARN TaskSetManager: Lost task 193.0 in stage 109.0 (TID 5063, ip-10-6-102-16.us-west-2.compute.internal, executor 6): org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: scala.MatchError: null
at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: null
at org.elasticsearch.spark.sql.DataFrameValueWriter.writeStruct(DataFrameValueWriter.scala:74)
... 10 more

Can anyone help me to solve the above issue.

Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.