Below is given structure:
root
|-- customerid: long (nullable = true)
|-- leadid: long (nullable = true)
|-- statusid: integer (nullable = true)
|-- islaststatus: string (nullable = true)
|-- hist: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- statusid: integer (nullable = true)
| | |-- islaststatus: string (nullable = true)
| | |-- createdon: timestamp (nullable = true)
| | |-- updatedon: timestamp (nullable = true)
I am trying to add elements in the hist column using painless script into elasticsearch. Below is the code which I have tried:
def load_df_to_es(es_index, dataframe, operation="upsert", mode="append", mapping_id="id"):
esconf = {"es.mapping.id": mapping_id,"es.nodes": "hostname", "es.port": "9200","es.write.operation": operation, "es.index.auto.create" : True, "es.update.script.inline" : "for (item in params.new_hist) {ctx._source.hist.add(item);}", "es.update.script.params" : "new_hist:hist"}
dataframe.write.format("org.elasticsearch.spark.sql").options(**esconf).mode(mode).save(es_index)
load_df_to_es("my_index",my_dataframe, "upsert", "append", "customerid")
I am getting below error while adding array elements into existing array:
Actually, Its not able to pass array into params. PLease suggest if I am missing something.
File "/home/adarshbajpai/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1457.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 230.0 failed 1 times, most recent failure: Lost task 19.0 in stage 230.0 (TID 14534, localhost, executor driver): org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.Tuple2
at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)