How can I save a data with type Array[String] to Es with painless script

In my project, i tried to save the data from spark-stream to ES, to be exact, it is to update a struct table. I don't known is there any better way to update data in ES? Well, I used the painless script to update, like this:

def fooUpdate(dfNewData: DataFrame): Unit = {
  val upParams = "ports:port_list, type:asset_type, reqCnt:request_connect_count, 
  rsvCnt:receive_connect_count, timestamp:discover_time"

  val upScript = "int flag = 0;if((null != params.ports) && (0 !=  params.ports.length))......"

  val esWriteConf = Map("es.write.operation" -> "upsert",
  "es.mapping.id" -> "ip_addr",
  "es.update.script.lang" -> "painless",
  "es.update.script.params" -> upParams,
  "es.update.script.inline" -> upScript
  )
  dfNewData.saveToEs("spark/netAsset", esWriteConf)   //execute error from here
}

The table schema is:
root
|-- mac_addr: string (nullable = false)
|-- ip_addr: string (nullable = false)
|-- port_list: array (nullable = false)
| |-- element: string (containsNull = true)
|-- asset_type: string (nullable = false)
|-- request_connect_count: long (nullable = false)
|-- receive_connect_count: long (nullable = false)
|-- discover_time: timestamp (nullable = false)
|-- ports_offline: array (nullable = false)
| |-- element: string (containsNull = true)
|-- ports_online_new: array (nullable = false)
| |-- element: string (containsNull = true)
|-- last_change_time: timestamp (nullable = false)

The main error message is:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.Tuple2

I tried many time to find the reason, the problem maybe caused by the data type of Array[String], As the code shows, if i deleted the params of "ports:port_list",the code runs well. and if i change the data type of the column "port_list" to String, it also runs OK。

I can't find any example of the complex data type save to ES, and a more detailed config description of "es.update.script.params". So it's the problem of my code or a bugs of ES or something else... Anyone Helps, Thanks very much!!!

my project environment:

  •     elasticsearch   6.6.1        
    
  •     elasticsearch-spark-20
    
  •     scala 2.11.7
    
  •     spark 2.4.1
    
  •     spark-streaming-kafka-0-10

:worried::worried::worried::worried: mark :

19/05/27 11:28:44 WARN TaskSetManager: Lost task 3.0 in stage 36.0 (TID 2015, 10.8.70.14, executor 0): org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.Tuple2
at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:71)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.Tuple2
at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:53)
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.doWrite(AbstractBulkFactory.java:152)
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:118)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.