Task exception could not be deserialized

I have a Spark job that runs on my localhost but when run on EMR I'm getting a Warning for

WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: org.elasticsearch.hadoop.EsHadoopException

and error

16/03/31 16:32:55 ERROR TaskResultGetter: Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@216b55e2

• My Spark EMR version is 1.5.2
• I was using elasticsearch-spark_2.10-2.2.0-rc1 but I also tried the latest release elasticsearch-spark_2.10-2.2.0

My code is simply creating a bunch of dummy documents and inserting them into elasticsearch.

def deviceDoc(deviceID: String): Any = {
  // snip because this post is limited to 500 chars
 return map;
}
val deviceRDD = deviceAggDF.map( x=>
         deviceDoc( x(0).toString )
       );
EsSpark.saveToEs(deviceRDD, "parent-child-devices/device", Map("es.mapping.id" -> "_id"))

Submitting the job

spark-submit \
    --jars ./elasticsearch-spark_2.10-2.2.0-rc1.jar \
    --class com.me.MyJob \
    --packages com.databricks:spark-csv_2.11:1.3.0 \
    ./myproject.jar \

The error

16/03/31 15:11:12 WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: org.elasticsearch.hadoop.EsHadoopException
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:278)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
	at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
16/03/31 15:11:12 ERROR TaskResultGetter: Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@1cba98ca

This error occurs every now and then but I can't reproduce it despite several attempts.

As far as I can tell, the problem lies with Spark. Namely one of the tasks fails (maybe because the data to/from ES is malformed or something) and ES-Hadoop throws the exception higher up the stack. Spark catches this, shuts down the workers and tries to bubble up the exception in the master. However since the job classloader has been closed automatically, the exception cannot be deserialized (despite the class being available in the classpath) which leads to a different error, masking the original one.

In fact, if you look closely through the logs, you should see the initial ESHadoopException stacktrace.

P.S. By the way, the es-hadoop package is available as a spark package as well.
P.P.S. Using spark 1.6.1 might help

I have a feeling this is caused by my mock device document function returning Any. If I have this function only return a Map with an _id attribute the job seems to never have errors.

This might just be me not knowing Scala, I believe the correct return type of this function should specify a Map with the nested types.

def deviceDoc(deviceID: String): Any = {
  val map = Map(
    "_id" -> deviceID
    ,
    "device" -> Map(
      "carrier" -> random(Set("AT&T", "Verision")),
      "isp" -> random(Set("isp_214", "isp_215")),
      "model" -> random(Set("4", "5s")),
      "os" -> random(Set("iOS", "android"))
    )
    ,
    "demographics" -> Seq(
      Map(
        "age_range" -> random(Set("1-10", "11-20", "21-30", "31-40", "41-50", "old as dirt")),
        "conf_score" -> 57,
        "ethnicity" -> random(Set("asian", "white", "other")),
        "gender" -> random(Set("M", "F")),
        "hhi" -> random(Set("50K-60K", "70K-140K", "150K-160K")),
        "source" -> "datasrc_5"
      )
    )
  );
  
  return map
}

Fair enough but the exception should still be able to propagate properly - can you run a basic integration test in a local spark cluster? Do you see the exception being raised and if so, what is it?

Actually I take that back. I just ran a job with just the _id attribute and the error was raised :frowning:

Geez even removing the function and simply returning a Map throws the error.

val deviceRDD = aggDF.map(x=> Map(
         "_id" -> x(0)
));
EsSpark.saveToEs(deviceRDD, "parent-child-devices/device", Map("es.mapping.id" -> "_id"))

error

16/04/06 16:04:47 INFO TaskSetManager: Starting task 29.1 in stage 1.0 (TID 340, ip-10-136-126-99.ec2.internal, PROCESS_LOCAL, 2316 bytes)
16/04/06 16:04:47 WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: org.elasticsearch.hadoop.EsHadoopException
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:278)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
	at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
16/04/06 16:04:47 ERROR TaskResultGetter: Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@dd37992
16/04/06 16:04:47 WARN TaskSetManager: Lost task 155.1 in stage 1.0 (TID 265, ip-10-169-62-32.ec2.internal): UnknownReason

Not sure how to run an integration test on the cluster. I am able to run spark-shell and walk through each step.

The difficult thing is that the error doesn't consistently happen. I'm running the job multiple times on a small file and I seem to get the error 50% of the time.

So I'm stepping through my job in spark-shell to ensure the data in my RRD is ok. I can not identify any issues with the data.

Looks like my EMR cluster was using an old version of Spark. I have the a cluster running with 1.6 and I believe it is fixed. Thanks!

So upgrading the Spark cluster to 1.6 seems to have fixed the problem, right? If the issue keeps popping up, please speak up.

I'm having the same issue with a long-running job, a few tasks are failing and am seeing the same error stack. Running Spark 1.5.2.
Is updating to Spark 1.6.x guaranteed to fix this?
Thanks