I'm trying to update a nested field using Spark and a scripted update. The code I'm using is:
update_params = "new_samples: samples"
update_script = "ctx._source.samples += new_samples"
es_conf = {
"es.mapping.id": "id",
"es.mapping.exclude": "id",
"es.write.operation": "upsert",
"es.update.script.params": update_params,
"es.update.script.inline": update_script
}
result.write.format("org.elasticsearch.spark.sql").options(**es_conf).option("es.nodes",configuration["elasticsearch"]["host"]).option("es.port",configuration["elasticsearch"]["port"] ).save(configuration["elasticsearch"]["index_name"]+"/"+configuration["version"],mode='append')
And the schema of the field is:
|-- samples: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- gq: integer (nullable = true)
| | |-- dp: integer (nullable = true)
| | |-- gt: string (nullable = true)
| | |-- adBug: array (nullable = true)
| | | |-- element: integer (containsNull = true)
| | |-- ad: double (nullable = true)
| | |-- sample: string (nullable = true)
And I get the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o83.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.Tuple2
Am I doing something wrong or is this a bug? Thanks.