Spark-sql does not seem to read from a nested schema

Hi,

I tried with 2.1.0 and 2.2.SNAPSHOT versions and it seems that when I have nested schema, spark-sql is not able to read the data.

public class SimpleApp {
    public static void main(String[] args) {
	SparkConf conf = new SparkConf().setAppName("es").setMaster("local");
	JavaSparkContext jsc = new JavaSparkContext(conf);
	JavaRDD<Map<String, Object>> esRDD1 = JavaEsSpark.esRDD(jsc, "myindex/test").values();
	for (Map<String, Object> obj : esRDD1.collect()) {
	    System.out.println(obj);   // prints all rows
	}
	SQLContext sql = new SQLContext(jsc);
	DataFrame test = JavaEsSparkSQL.esDF(sql, "myindex/test");
	test.printSchema(); // prints without nested fields
	test.show();  // fails to read nested fields
    }}

Data in myindex/test

{id=1, ts=2, timestamp=1424987184887}
{id=2, ts=21, session={user=foo, name=bar}, timestamp=1424987184887}

test.printSchema prints:

root
 |-- id: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- ts: long (nullable = true)

test.show() throws this exception

15/06/30 17:13:32 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 8)
org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'session' not found; typically this occurs with arrays which are not mapped as single value
	at org.elasticsearch.spark.sql.RowValueReader$class.rowOrder(RowValueReader.scala:26)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.rowOrder(ScalaEsRowValueReader.scala:13)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:32)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:620)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:358)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:293)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:188)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:167)
	at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
	at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
	at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
15/06/30 17:13:32 INFO TaskSetManager: Starting task 3.0 in stage 2.0 (TID 9, localhost, ANY, 3328 bytes)
15/06/30 17:13:32 INFO Executor: Running task 3.0 in stage 2.0 (TID 9)
15/06/30 17:13:32 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 8, localhost): org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'session' not found; typically this occurs with arrays which are not mapped as single value
	at org.elasticsearch.spark.sql.RowValueReader$class.rowOrder(RowValueReader.scala:26)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.rowOrder(ScalaEsRowValueReader.scala:13)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:32)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:620)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:358)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:293)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:188)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:167)
	at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
	at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
	at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

15/06/30 17:13:32 ERROR TaskSetManager: Task 2 in stage 2.0 failed 1 times; aborting job
15/06/30 17:13:32 INFO Executor: Finished task 3.0 in stage 2.0 (TID 9). 628 bytes result sent to driver
15/06/30 17:13:32 INFO TaskSchedulerImpl: Cancelling stage 2
15/06/30 17:13:32 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
15/06/30 17:13:32 INFO TaskSchedulerImpl: Stage 2 was cancelled
15/06/30 17:13:32 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 9) in 7 ms on localhost (3/4)
15/06/30 17:13:32 INFO DAGScheduler: ResultStage 2 (show at SimpleApp.java:30) failed in 0.043 s
15/06/30 17:13:32 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
15/06/30 17:13:32 INFO DAGScheduler: Job 2 failed: show at SimpleApp.java:30, took 0.050427 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 8, localhost): org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'session' not found; typically this occurs with arrays which are not mapped as single value
	at org.elasticsearch.spark.sql.RowValueReader$class.rowOrder(RowValueReader.scala:26)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.rowOrder(ScalaEsRowValueReader.scala:13)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:32)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:620)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:358)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:293)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:188)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:167)
	at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
	at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
	at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/06/30 17:13:32 INFO SparkContext: Invoking stop() from shutdown hook

I switched to 2.2.0.BUILD-SNAPSHOT and I don't see this issue anymore.

Thanks.

That is strange considering the changes in 2.2.0 are minimal so far compared to 2.1.0 - can you double check that you don't have multiple jars in the classpath and thus an old version of Elasticsearch is being used instead?

Cheers,

Hi @costin,

I cleaned up unnecessary jars.
I checked with single nested object in schema, it worked.
However it is still failing when I have list/array of objects in the schema.

This is the data I have in elasticsearch

POST index1/type1/1 
{
    "id":1,
    "name":"test",
    "ObjArr": [
        {
            "userData":{
                "name":"user1"
            }
        },
        {
            "userData":{
                "name":"user1"
            }
        }
]
}

I see same error for a query like "select * from type1"

Running query >>> select * from type1
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/07/14 16:45:41 INFO SparkContext: Running Spark version 1.4.0
15/07/14 16:45:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/07/14 16:45:42 INFO SecurityManager: Changing view acls to: puneet.jaiswal
15/07/14 16:45:42 INFO SecurityManager: Changing modify acls to: puneet.jaiswal
15/07/14 16:45:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(puneet.jaiswal); users with modify permissions: Set(puneet.jaiswal)
15/07/14 16:45:42 INFO Slf4jLogger: Slf4jLogger started
15/07/14 16:45:42 INFO Remoting: Starting remoting
15/07/14 16:45:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.16:52998]
15/07/14 16:45:42 INFO Utils: Successfully started service 'sparkDriver' on port 52998.
15/07/14 16:45:42 INFO SparkEnv: Registering MapOutputTracker
15/07/14 16:45:42 INFO SparkEnv: Registering BlockManagerMaster
15/07/14 16:45:42 INFO DiskBlockManager: Created local directory at /private/var/folders/wq/qbdzhygn3sz4vpd_9r3nys4dkyxdqy/T/spark-9dfdf0ab-d148-4fe5-b774-67a038c85998/blockmgr-a62cc2d7-6268-4f26-bbe5-1241014e1c71
15/07/14 16:45:42 INFO MemoryStore: MemoryStore started with capacity 1966.1 MB
15/07/14 16:45:42 INFO HttpFileServer: HTTP File server directory is /private/var/folders/wq/qbdzhygn3sz4vpd_9r3nys4dkyxdqy/T/spark-9dfdf0ab-d148-4fe5-b774-67a038c85998/httpd-37528b3b-4d00-4fce-8b56-7d88f7582458
15/07/14 16:45:42 INFO HttpServer: Starting HTTP Server
15/07/14 16:45:42 INFO Utils: Successfully started service 'HTTP file server' on port 52999.
15/07/14 16:45:42 INFO SparkEnv: Registering OutputCommitCoordinator
15/07/14 16:45:42 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/07/14 16:45:42 INFO SparkUI: Started SparkUI at http://192.168.0.16:4040
15/07/14 16:45:42 INFO SparkContext: Added JAR ./lib/elasticsearch-spark_2.10-2.2.0.BUILD-20150714.024220-21.jar at http://192.168.0.16:52999/jars/elasticsearch-spark_2.10-2.2.0.BUILD-20150714.024220-21.jar with timestamp 1436917542948
15/07/14 16:45:42 INFO Executor: Starting executor ID driver on host localhost
15/07/14 16:45:43 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53000.
15/07/14 16:45:43 INFO NettyBlockTransferService: Server created on 53000
15/07/14 16:45:43 INFO BlockManagerMaster: Trying to register BlockManager
15/07/14 16:45:43 INFO BlockManagerMasterEndpoint: Registering block manager localhost:53000 with 1966.1 MB RAM, BlockManagerId(driver, localhost, 53000)
15/07/14 16:45:43 INFO BlockManagerMaster: Registered BlockManager
preparing read from schema: index1/type1, table:type1
root
 |-- ObjArr: struct (nullable = true)
 |    |-- userData: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

15/07/14 16:45:44 INFO Exchange: Using SparkSqlSerializer2.
15/07/14 16:45:44 INFO SparkContext: Starting job: count at SimpleApp.java:87
15/07/14 16:45:44 INFO Version: Elasticsearch Hadoop v2.2.0.BUILD-SNAPSHOT [a4c367b4f3]
15/07/14 16:45:44 INFO ScalaEsRowRDD: Reading from [index1/type1]
15/07/14 16:45:44 INFO ScalaEsRowRDD: Discovered mapping {index1=[mappings=[type1=[ObjArr=[userData=[name=STRING]], id=LONG, name=STRING]]]} for [index1/type1]
15/07/14 16:45:44 INFO DAGScheduler: Registering RDD 4 (count at SimpleApp.java:87)
15/07/14 16:45:44 INFO DAGScheduler: Got job 0 (count at SimpleApp.java:87) with 1 output partitions (allowLocal=false)
15/07/14 16:45:44 INFO DAGScheduler: Final stage: ResultStage 1(count at SimpleApp.java:87)
15/07/14 16:45:44 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
15/07/14 16:45:44 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
15/07/14 16:45:44 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[4] at count at SimpleApp.java:87), which has no missing parents
15/07/14 16:45:44 INFO MemoryStore: ensureFreeSpace(8424) called with curMem=0, maxMem=2061647216
15/07/14 16:45:44 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 8.2 KB, free 1966.1 MB)
15/07/14 16:45:44 INFO MemoryStore: ensureFreeSpace(4134) called with curMem=8424, maxMem=2061647216
15/07/14 16:45:44 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.0 KB, free 1966.1 MB)
15/07/14 16:45:44 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:53000 (size: 4.0 KB, free: 1966.1 MB)
15/07/14 16:45:44 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:874
15/07/14 16:45:44 INFO DAGScheduler: Submitting 5 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[4] at count at SimpleApp.java:87)
15/07/14 16:45:44 INFO TaskSchedulerImpl: Adding task set 0.0 with 5 tasks
15/07/14 16:45:44 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 3480 bytes)
15/07/14 16:45:44 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/07/14 16:45:44 INFO Executor: Fetching http://192.168.0.16:52999/jars/elasticsearch-spark_2.10-2.2.0.BUILD-20150714.024220-21.jar with timestamp 1436917542948
15/07/14 16:45:44 INFO Utils: Fetching http://192.168.0.16:52999/jars/elasticsearch-spark_2.10-2.2.0.BUILD-20150714.024220-21.jar to /private/var/folders/wq/qbdzhygn3sz4vpd_9r3nys4dkyxdqy/T/spark-9dfdf0ab-d148-4fe5-b774-67a038c85998/userFiles-b873c204-d439-409c-8992-d5894a76205d/fetchFileTemp7264345150258615091.tmp
15/07/14 16:45:44 INFO Executor: Adding file:/private/var/folders/wq/qbdzhygn3sz4vpd_9r3nys4dkyxdqy/T/spark-9dfdf0ab-d148-4fe5-b774-67a038c85998/userFiles-b873c204-d439-409c-8992-d5894a76205d/elasticsearch-spark_2.10-2.2.0.BUILD-20150714.024220-21.jar to class loader
15/07/14 16:45:45 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 879 bytes result sent to driver
15/07/14 16:45:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 3480 bytes)
15/07/14 16:45:45 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/07/14 16:45:45 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 238 ms on localhost (1/5)
15/07/14 16:45:45 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 879 bytes result sent to driver
15/07/14 16:45:45 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, ANY, 3480 bytes)
15/07/14 16:45:45 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
15/07/14 16:45:45 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 27 ms on localhost (2/5)
15/07/14 16:45:45 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 2)
org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'name' not found; typically this occurs with arrays which are not mapped as single value
	at org.elasticsearch.spark.sql.RowValueReader$class.rowOrder(RowValueReader.scala:26)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.rowOrder(ScalaEsRowValueReader.scala:13)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:32)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:620)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:600)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:562)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:358)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:293)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:188)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:167)
	at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
	at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
	at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129)
	at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
15/07/14 16:45:45 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, ANY, 3480 bytes)
15/07/14 16:45:45 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
15/07/14 16:45:45 WARN TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, localhost): org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'name' not found; typically this occurs with arrays which are not mapped as single value
	at org.elasticsearch.spark.sql.RowValueReader$class.rowOrder(RowValueReader.scala:26)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.rowOrder(ScalaEsRowValueReader.scala:13)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:32)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:620)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:600)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:562)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:358)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:293)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:188)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:167)
	at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
	at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
	at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129)
	at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

15/07/14 16:45:45 ERROR TaskSetManager: Task 2 in stage 0.0 failed 1 times; aborting job
15/07/14 16:45:45 INFO TaskSchedulerImpl: Cancelling stage 0
15/07/14 16:45:45 INFO Executor: Executor is trying to kill task 3.0 in stage 0.0 (TID 3)
15/07/14 16:45:45 INFO TaskSchedulerImpl: Stage 0 was cancelled
15/07/14 16:45:45 INFO DAGScheduler: ShuffleMapStage 0 (count at SimpleApp.java:87) failed in 0.303 s
15/07/14 16:45:45 INFO DAGScheduler: Job 0 failed: count at SimpleApp.java:87, took 0.472835 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost): org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'name' not found; typically this occurs with arrays which are not mapped as single value
	at org.elasticsearch.spark.sql.RowValueReader$class.rowOrder(RowValueReader.scala:26)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.rowOrder(ScalaEsRowValueReader.scala:13)
	at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:32)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:620)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:600)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:562)
	at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:358)
	at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:293)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:188)
	at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:167)
	at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
	at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
	at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129)
	at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/07/14 16:45:45 INFO SparkContext: Invoking stop() from shutdown hook
15/07/14 16:45:45 INFO Executor: Executor killed task 3.0 in stage 0.0 (TID 3)
15/07/14 16:45:45 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, localhost): TaskKilled (killed intentionally)
15/07/14 16:45:45 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/07/14 16:45:45 INFO SparkUI: Stopped Spark web UI at http://192.168.0.16:4040
15/07/14 16:45:45 INFO DAGScheduler: Stopping DAGScheduler
15/07/14 16:45:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/14 16:45:45 INFO Utils: path = /private/var/folders/wq/qbdzhygn3sz4vpd_9r3nys4dkyxdqy/T/spark-9dfdf0ab-d148-4fe5-b774-67a038c85998/blockmgr-a62cc2d7-6268-4f26-bbe5-1241014e1c71, already present as root for deletion.
15/07/14 16:45:45 INFO MemoryStore: MemoryStore cleared
15/07/14 16:45:45 INFO BlockManager: BlockManager stopped
15/07/14 16:45:45 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/14 16:45:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/07/14 16:45:45 INFO SparkContext: Successfully stopped SparkContext
15/07/14 16:45:45 INFO Utils: Shutdown hook called
15/07/14 16:45:45 INFO Utils: Deleting directory /private/var/folders/wq/qbdzhygn3sz4vpd_9r3nys4dkyxdqy/T/spark-9dfdf0ab-d148-4fe5-b774-67a038c85998

This is my dependency list

	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.10</artifactId>
		<version>1.4.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-sql_2.10</artifactId>
		<version>1.4.0</version>
	</dependency>
	<dependency>
		<groupId>org.elasticsearch</groupId>
		<artifactId>elasticsearch-spark_2.10</artifactId>
		<version>2.2.0.BUILD-SNAPSHOT</version>
	</dependency>

Indeed that is the case - there's an issue opened for it. I'll reiterate here quickly the problem - Elasticsearch does not differentiate between a field with one value or multiple in terms of mapping. As a client, there's no way to know that until the data is being returned.
This is problematic with Spark SQL since the schema needs to be known up-front - if the nested/field contains only one value, it is properly mapped, if it contains multiple an array is being returned but it cannot be mapped to just one value leading to the exception you are facing.
The solution going forward is to allow the user to specify what fields it wants to be mapped as arrays and use that in terms of mapping as well (meaning fields with only one value will be mapped as an array with only one entry).

Hi Costin,

Can you give any ETA for an fix for this problem? We do encounter the same; we have complicated schema with nested objects, which are not picked up correctly by Spark SQL, resulting in a '... typically this occurs with arrays which are not mapped as single value' error.

What might be possible work arounds? Make your own StructType and/or Schema or just create your own RDD which maps each row to a case class?

Thanks in advance,

Leonard

Hopefully a couple of weeks (due to travel).

Hi Costin
Is there any update on this? thanks

This has been fixed in 2.2-beta1. Have you tried it out?

I tried it in 2.2.0.BUILD-SNAPSHOT I still having the problem

15/12/08 15:00:43 ERROR Executor: Exception in task 3.0 in stage 8.0 (TID 18)
org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'people' not found; typically this occurs with arrays which are not mappe
d as single value
at org.elasticsearch.spark.sql.RowValueReader$class.rowColumns(RowValueReader.scala:33)
at org.elasticsearch.spark.sql.ScalaRowValueReader.rowColumns(ScalaEsRowValueReader.scala:13)
at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:49)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:645)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:588)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:661)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:588)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:383)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:318)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:213)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186)
at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:438)
at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:86)
at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)

Thanks, I'll look into it.

Hello,

Any update on this?

Or maybe any advise about data schema in order to be fully "compliant" with SparkSQL? It looks like nested JSON is not working isnit?

Thanks,

I am waiting for the answer too

Strange thing is that SparkSQL (like Apache Drill) is fully JSON friendly, as you can see at http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets, no problem to play with a plain JSON files, here elasticsearch API could be see simply as a HTTP loader isnit?

Everyone, this issue has been addressed in ES-Hadoop 2.2-rc1 and documented here.
If it doesn't addresses your issues, please open up new issues or posts.

@ebuildy While SparkSQL does provide some JSON support, I would argue it is not "fully JSON friendly". Below a quote from the page you linked:

Note that the file that is offered as a json file is not a typical JSON file. Each
line must contain a separate, self-contained valid JSON object. As a consequence,
a regular multi-line JSON file will most often fail.

That is, regular JSON file won't work. Neither will JSON with documents with different content type.
In fact, the issue of dealing with JSON and SparkSQL is that JSON is fairly loose while SparkSQL is quite strict.

Conceptually yes, one can see ES-Hadoop as a distributed, HTTP loader - the complicated bits though come when dealing with that data effectively. What happens if there's pushback, what about filtering fields or transforming them, what if the docs don't fit the same structure (that's why documents != tables), etc....

Having code working reliably and user satisfied is the first thing on my list - I'm looking for anything that makes my job easier. The JSON support in Spark / Spark SQL though, is not (yet) there.

1 Like