Upsert nested fields with Spark

I'm trying to update a nested field using Spark and a scripted update. The code I'm using is:

update_params = "new_samples: samples"
update_script = "ctx._source.samples += new_samples"

es_conf = {
"es.mapping.id": "id",
"es.mapping.exclude": "id",
"es.write.operation": "upsert",
"es.update.script.params": update_params,
"es.update.script.inline": update_script
}

result.write.format("org.elasticsearch.spark.sql").options(**es_conf).option("es.nodes",configuration["elasticsearch"]["host"]).option("es.port",configuration["elasticsearch"]["port"] ).save(configuration["elasticsearch"]["index_name"]+"/"+configuration["version"],mode='append')

And the schema of the field is:

|-- samples: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- gq: integer (nullable = true)
| | |-- dp: integer (nullable = true)
| | |-- gt: string (nullable = true)
| | |-- adBug: array (nullable = true)
| | | |-- element: integer (containsNull = true)
| | |-- ad: double (nullable = true)
| | |-- sample: string (nullable = true)

And I get the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o83.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.Tuple2

Am I doing something wrong or is this a bug? Thanks.

It's difficult to say without the full stack trace. Can you provide it?

The full stack trace is:

https://pastebin.com/34tFHqYj

I couldn't paste it here because it hit the maximum amount of characters.

Thanks again!

Ahah, yes, this is a known issue: https://github.com/elastic/elasticsearch-hadoop/issues/931

We have a set of interfaces that define how to convert between integration specific record types and JSON values. For SparkSQL, each value in a row needs to be placed in a specific spot in the row order. Since JSON doesn't necessarily make any guarantees of the order of fields, we need the schema available to go between Row objects and raw values. When we serialize the data in SparkSQL, we pass in a Tuple2 with the schema in one slot and the record in the other. This somewhat breaks the contract that we have built for these serialization tools in other places though, most notably here where we try to extract a value to be used in a script parameter.

Ok, thanks for the fast reply James! Is there a workaround for the moment, or will it be fixed in future releases?

This will be fixed in a future release. I do not believe there is any workaround for it at the moment aside from trying to keep all script parameters limited to basic data types.

Ok, thanks!!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.