Hi
I am running Spark 1.3.1 in local mode and my scala version is 2.10. I have some data indexed into elasticsearch and trying to read that using hiveContext.esDF method. It works for other schemas of data but fails for the one shown below
{
"Name": "ken1",
"businessCategory": "BMS",
"conditions": [{
"category": "bb",
"diagCode": "cc",
"diagCodeType": "D",
"name": "Blood Pressure",
"severity": "Medium"
}],
"id": 1,
"organizationId": "NXP"
}
When i try to execute sql query on this data like "select * from /" i get an exception like shown below
scala.MatchError: (StringType,[[bb,cc,D,Blood Pressure,Medium]]) (of class scala.Tuple2)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:451)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:451)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1$$anonfun$apply$7.apply(JsonRDD.scala:486)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1$$anonfun$apply$7.apply(JsonRDD.scala:482)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:482)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:451)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1$$anonfun$apply$7.apply(JsonRDD.scala:486)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1$$anonfun$apply$7.apply(JsonRDD.scala:482)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:482)
at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$valWriter$1$1.apply(JsonRDD.scala:451)
at org.apache.spark.sql.json.JsonRDD$.rowToJSON(JsonRDD.scala:491)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.next(DataFrame.scala:1233)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.next(DataFrame.scala:1230)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.foreach(DataFrame.scala:1230)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.to(DataFrame.scala:1230)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.toBuffer(DataFrame.scala:1230)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at org.apache.spark.sql.DataFrame$$anonfun$toJSON$1$$anon$1.toArray(DataFrame.scala:1230)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I see no elasticsearch-spark code in the stack trace. The code is exactly failing at the line hiveContext.sql(query).toJSON.collect().mkString(",") in my code. Do you see something wrong with the kind of data i indexed? I tried to read and write the same json using spark shell (using sqlcontext to read json and convert that to jsonRdd) which worked fine.
Thanks in advance.