Save javaRDD to Elastic Search

I have designed a class which basically reads parquet data and saving them as form of javaRDD into ES.

Spark Version: 1.5
CDH Version: CDH-5.5.2-1.cdh5.5.2.p0.4

Getting the following exception during the saveToEs Stage

17/02/20 15:21:24 WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1698)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/02/20 15:21:24 ERROR TaskResultGetter: Could not deserialize TaskEndReason: ClassNotFound with classloader org.apache.spark.util.MutableURLClassLoader@463929c7

Below the code snippet

JavaRDD<Charval> Charvals = charVal_select.javaRDD().map(new Function<Row, Charval>() {
	    /**
		 * 
		 */
		private static final long serialVersionUID = 1L;

		public Charval call(Row y) throws Exception {
	    	 return Charval.builder()
	           			.charvalCode(y.getString(0))
																					.charCode(y.getString(1))
																					.charvalType(y.getString(1))
																					.numericVal(y.getDouble(2))
																					.unitOfMeasure(y.getString(3))
																					.proposalFlg(y.getString(4))
																					.intrnlUseFlg(y.getString(5))
																					.cmrclySensitiveFlg(y.getString(6))
																					.updDttmUtc(y.getTimestamp(7))
																					.descriptionAlt(DescriptionAlt.builder().dscrAltEnZz(y.getString(8)).build())
																					.descriptionMain(DescriptionMain.builder().dscrMainEnZz(y.getString(9)).build())
																					.build();
	    }
	});
	
//	JavaRDD _esCharValRDD						= sc.parallelize(Arrays.asList(esCharValRDD.first()),40000);
	if(ESMainProcessor.isFullSync()){
	//	ESNativeOperations.deleteIndex(UtilityManager.getESNativeClient(), "rd_cval/cval");
	//	JavaEsSpark.saveToEs(esCharValRDD, "rd_cval/cval");
	}else{
		
	}
	
	JavaEsSpark.saveToEs(Charvals, "rd_cval/cval");

I took the help of Kyro serialization as well. This cluster has lot of other systems running and the spark cannot be upgraded if the solution is to upgrade. I need at least workaround. Please help
Also tried Parallelizing, which took humongous time still failed during write to es.

I had the same issue before and I think it was caused by an incorrect es-hadoop package.

What version of the es-hadoop package are you using? Can you post your spark-submit command?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.