In my project, i tried to save the data from spark-stream to ES, to be exact, it is to update a struct table. I don't known is there any better way to update data in ES? Well, I used the painless script to update, like this:
def fooUpdate(dfNewData: DataFrame): Unit = {
val upParams = "ports:port_list, type:asset_type, reqCnt:request_connect_count,
rsvCnt:receive_connect_count, timestamp:discover_time"
val upScript = "int flag = 0;if((null != params.ports) && (0 != params.ports.length))......"
val esWriteConf = Map("es.write.operation" -> "upsert",
"es.mapping.id" -> "ip_addr",
"es.update.script.lang" -> "painless",
"es.update.script.params" -> upParams,
"es.update.script.inline" -> upScript
)
dfNewData.saveToEs("spark/netAsset", esWriteConf) //execute error from here
}
The table schema is:
root
|-- mac_addr: string (nullable = false)
|-- ip_addr: string (nullable = false)
|-- port_list: array (nullable = false)
| |-- element: string (containsNull = true)
|-- asset_type: string (nullable = false)
|-- request_connect_count: long (nullable = false)
|-- receive_connect_count: long (nullable = false)
|-- discover_time: timestamp (nullable = false)
|-- ports_offline: array (nullable = false)
| |-- element: string (containsNull = true)
|-- ports_online_new: array (nullable = false)
| |-- element: string (containsNull = true)
|-- last_change_time: timestamp (nullable = false)
The main error message is:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.Tuple2
I tried many time to find the reason, the problem maybe caused by the data type of Array[String], As the code shows, if i deleted the params of "ports:port_list",the code runs well. and if i change the data type of the column "port_list" to String, it also runs OK。
I can't find any example of the complex data type save to ES, and a more detailed config description of "es.update.script.params". So it's the problem of my code or a bugs of ES or something else... Anyone Helps, Thanks very much!!!
my project environment:
-
elasticsearch 6.6.1
-
elasticsearch-spark-20
-
scala 2.11.7
-
spark 2.4.1
-
spark-streaming-kafka-0-10