Hi All,
Need some help.
I am using PySpark ( Spark 2.2.0, Scala 2.11.8, AWS EMR emr-5.8.0) and trying to migrate from Elasticsearch 5.6.3 to 6.8.3.
The following jar i have been using so far:
elasticsearch-spark-20_2.11-5.6.3
For migration i am using the following jar and testing the PySpark Jobs:
elasticsearch-spark-20_2.11-6.8.3
I have ES Mapping as shown below:
"availability": {
"properties": {
"current": {
"properties": {
"lastUpdateTm": { "type": "long" },
"size": { "type": "keyword", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } },
"value": { "type": "long" }
}
},
"previous": {
"properties": {
"lastUpdateTm": { "type": "long" },
"size": { "type": "keyword", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } },
"value": { "type": "long" }
}
}
}
}
PySpark Job 1:
a) I have defined schema as shown below in PySpark Job:
nestedSchema = StructType([
StructField('current',ArrayType(
StructType([
StructField('lastUpdateTm',LongType(),nullable=True),
StructField('size',StringType(),nullable=True),
StructField('value',LongType(),nullable=True)
])
),nullable=True),
StructField('previous',ArrayType(
StructType([
StructField('lastUpdateTm',LongType(),nullable=True),
StructField('size',StringType(),nullable=True),
StructField('value',LongType(),nullable=True)
])
),nullable=True)
])
b) Write to ES:
udfCreateObject = udf(lambda x,y:[x,y],nestedSchema)
dfOther = dfSource.groupBy('id').agg(collect_list('current').alias('aggrCurrent'),collect_list('previous').alias('aggrPrevious'))
dfTarget = dfSource.withColumn('active',lit(True))
.withColumn('hashKey',concat_ws("-","key-column1","key-column2")) \
.withColumn("availability",udfCreateObject(dfOther.aggrCurrent,dfOther.aggrPrevious))
writeToES(dataToWrite=dfTarget, index="test-index", docType="test-doc-type", mappingID='hashKey') .save()
c) Sample ES document:
{
"_index": "test-index",
"_type": "test-doc-type",
"_source": {
"hashKey" : "key1",
"active": true,
"availability": {
"current": null,
"previous": null
}
}
},
{
"_index": "test-index",
"_type": "test-doc-type",
"_source": {
"hashKey" : "key2",
"active": true,
"availability": {
"current": [
{
"lastUpdateTm": 1571702400000,
"size": "10",
"value": 100
},
...
],
"previous": [
{
"lastUpdateTm": 1571702400000,
"size": "30",
"value": 200
},
...
]
}
}
}
PySpark Job 2:
I have another PySpark job which reads Elasticsearch from the above index "test-index", doc type "test-doc-type" and writes it back with updated values.
a) dfSource = readFromES(index="test-index", docType="test-doc-type") .option("es.read.field.as.array.include", availability.current,availability.previous") .load()
b) dfTarget = "Target data frame after transformations"
c) Please follow two rows after printing the above dataframe
+---------+-----------------------------------------------------------------------------------------------------------------------------------+
|hashKey |availability |
+---------+-----------------------------------------------------------------------------------------------------------------------------------+
|key1 |[WrappedArray(null),WrappedArray(null)] |
|key2 |[WrappedArray([1571702400000,10,100], [1571702400000,20,1000]),WrappedArray([1571702400000,30,200], [1571702400000,40,1500])]|
+---------+-----------------------------------------------------------------------------------------------------------------------------------+
d) writeToES(dataToWrite=dfTarget, index="test-index", docType="test-doc-type", mappingID='hashKey').save()
e) Above save() is throwing the following error:
WARN TaskSetManager: Lost task 193.0 in stage 109.0 (TID 5063, ip-10-6-102-16.us-west-2.compute.internal, executor 6): org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: scala.MatchError: null
at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: null
at org.elasticsearch.spark.sql.DataFrameValueWriter.writeStruct(DataFrameValueWriter.scala:74)
... 10 more
Can anyone help me to solve the above issue.
Thanks