How to add array elements into existing array using painless script into elasticsearch?

Below is given structure:

root
 |-- customerid: long (nullable = true)
 |-- leadid: long (nullable = true)
 |-- statusid: integer (nullable = true)
 |-- islaststatus: string (nullable = true)
 |-- hist: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- statusid: integer (nullable = true)
 |    |    |-- islaststatus: string (nullable = true)
 |    |    |-- createdon: timestamp (nullable = true)
 |    |    |-- updatedon: timestamp (nullable = true)

I am trying to add elements in the hist column using painless script into elasticsearch. Below is the code which I have tried:

def load_df_to_es(es_index, dataframe, operation="upsert", mode="append", mapping_id="id"):  
           esconf = {"es.mapping.id": mapping_id,"es.nodes": "hostname", "es.port": "9200","es.write.operation": operation, "es.index.auto.create" : True, "es.update.script.inline" : "for (item in params.new_hist)  {ctx._source.hist.add(item);}", "es.update.script.params" : "new_hist:hist"}
           dataframe.write.format("org.elasticsearch.spark.sql").options(**esconf).mode(mode).save(es_index)

load_df_to_es("my_index",my_dataframe, "upsert", "append", "customerid")

I am getting below error while adding array elements into existing array:

Actually, Its not able to pass array into params. PLease suggest if I am missing something.

File "/home/adarshbajpai/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1457.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 230.0 failed 1 times, most recent failure: Lost task 19.0 in stage 230.0 (TID 14534, localhost, executor driver): org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.Tuple2
	at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136)
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.