Saving Nested Document - ES Hadoop - saveToES

Written a code to save javaRDD to Elastic Search. There are no issues with document having NO nested mappings. But with any document having nested mappings, below exception comes while execution. Am I doing something wrong here?

JavaRDD Charvals = charVal_select.javaRDD().map(new Function<Row, String>() {
public String call(Row y) throws Exception {
return Charval.builder()
.charvalCode(y.getString(0))
.charCode(y.getString(1))
.charvalType(y.getString(1))
.numericVal(y.getDouble(2))
.unitOfMeasure(y.getString(3))
.proposalFlg(y.getString(4))
.intrnlUseFlg(y.getString(5))
.cmrclySensitiveFlg(y.getString(6))
.updDttmUtc(y.getString(7))
.descriptionAlt(DescriptionAlt.builder().dscrAltEnZz(y.getString(8)).build().toString())
.descriptionMain(DescriptionMain.builder().dscrMainEnZz(y.getString(9)).build().toString())
.build().toString();
}
});

	JavaEsSpark.saveJsonToEs(Charvals, "rd_cval/cval",
                            ImmutableMap.of("es.mapping.id","CHAR_CODE"));

Where descriptionAlt & descriptionMain are my sub documents.

And the exception is

17/03/02 13:06:24 WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1698)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/03/02 13:06:24 ERROR TaskResultGetter: Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@29c214c0

Please help me !

You seem to be running into this problem with Spark: Eager loading in Spark to prevent ClassNotFound in case of exceptions · Issue #713 · elastic/elasticsearch-hadoop · GitHub. The problem essentially comes down to Spark closing out it's classloader objects that contain our connector code too early, and thusly, not being able to find the Exception objects to serialize back to the driver.

I would advise increasing your logging level on your tasks to try and find the underlying serialization issue.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.