Spark elasticsearch 5.0.2 scala.MatchError

Hello,
I'm using Spark 2.0 with Spark Elasticsearch version 5.0.2 (Scala 2.11):
"org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.0.2"

When trying to read from an index I'm getting the following error:
scala.MatchError: Buffer() (of class scala.collection.convert.Wrappers$JListWrapper)

This is how I'm getting the data (session is an instance of SparkSession):
session.sqlContext.read.format("org.elasticsearch.spark.sql").options(opt).load(esIndexName)

opt is a Map of options:
Map("es.read.field.as.array.include" -> "fieldNames",
"es.input.json" -> "true",
"es.field.read.empty.as.null" -> "true",
"es.index.read.missing.as.empty" -> "true",
"es.read.field.exclude" -> excludedFields)

Field is question is not in an array, it's a nested object field:
a {
b {
c = "string value"
}
}

I found a similar issue here:

But after upgrading to the latest spark-elasticsearch library version I'm still seeing the problem.
I would appreciate any suggestions on how to fix this.
Thanks,

M

Spark stack trace:

WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, host): scala.MatchError: Buffer() (of class scala.collection.convert.Wrappers$JListWrapper)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:296)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:261)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:261)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
<<<

Hello,
I'm going to answer my own question.
I was not able to find a solution to this problem. I found some answers online but they referred to older versions of spark-elasticsearch library, and they said that the problem was supposed to be fixed in the latest version.
Seeing how the latest version was still buggy I decided to skip the conversion process altogether and read ES index as JSON, and parse the data myself.

You can do that using code similar to this one (Scala):
val readCfg = Map("setting" -> "value")
val tuples = session.sparkContext.esJsonRDD("myEsIndex", readCfg)

"tuples" is a collection of Tuple2[String, String] items, where the first one is the entry index and the second one is the entry body (JSON text). You can parse JSON text using Playframework JSON library, for example.
I hope this helps,

M

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.