Scala.MatchError: when trying to read data from elasticsearch


(Sandeep Gunnam) #1

Hi
I am running Spark 1.3.1 in local mode and my scala version is 2.10. I have some data indexed into elasticsearch and trying to read that using hiveContext.esDF method. It works for other schemas of data but fails for the one shown below
{
"Name": "ken1",
"businessCategory": "BMS",
"conditions": [{
"category": "bb",
"diagCode": "cc",
"diagCodeType": "D",
"name": "Blood Pressure",
"severity": "Medium"
}],
"id": 1,
"organizationId": "NXP"
}

When i try to execute sql query on this data like "select * from /" i get an exception like shown below
scala.MatchError: (StringType,[[bb,cc,D,Blood Pressure,Medium]]) (of class scala.Tuple2)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:451)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:451)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1$$anonfun$apply$7.apply(JsonRDD.scala:486)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1$$anonfun$apply$7.apply(JsonRDD.scala:482)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:482)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:451)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1$$anonfun$apply$7.apply(JsonRDD.scala:486)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1$$anonfun$apply$7.apply(JsonRDD.scala:482)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:482)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:451)
at org.apache.spark.sql.json.JsonRDD$.rowToJSON(JsonRDD.scala:491)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.next(DataFrame.scala:1233)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.next(DataFrame.scala:1230)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.foreach(DataFrame.scala:1230)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.to(DataFrame.scala:1230)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.toBuffer(DataFrame.scala:1230)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.toArray(DataFrame.scala:1230)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I see no elasticsearch-spark code in the stack trace. The code is exactly failing at the line hiveContext.sql(query).toJSON.collect().mkString(",") in my code. Do you see something wrong with the kind of data i indexed? I tried to read and write the same json using spark shell (using sqlcontext to read json and convert that to jsonRdd) which worked fine.

Thanks in advance.


(Costin Leau) #2

Why are you using hivecontext? The data is read from Elasticsearch not Hive. Use SqlContext and the proper methods.

By the looks of it, Spark chokes on parsing the JSON - unclear though why that is.

P.S. Please use some formatting next time - your post is unreadable..


(Sandeep Gunnam) #3

Thanks for your response costin and sorry about the formatting (not sure how to correct it though). I used hivecontext as that object is already available in the code i am integrating. As you suggested I will try with sqlContext.


(Costin Leau) #4

HiveContext extends SqlContext however internally it is designed to work on Hive (as the name suggests). Using SqlContext is more appropriate since there's no Hive underneath. Also since you have HiveContext you should be able to get access to the underlying SqlContext or you can simply create a new one through new SqlContext(SparkContext)


(system) #5