Java.lang.ClassCastException: scala.collection.Iterator$$anon$11 cannot be cast to scala.Tuple2


(Philip K. Adetiloye) #1

I am having trouble running my job on my spark Cluster.
Each time I ran the job, I keep getting this error:


Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task
 3.3 in stage 0.0 (TID 12, 10.0.0.207): java.lang.ClassCastException: scala.coll
ection.Iterator$$anon$11 cannot be cast to scala.Tuple2
        at org.apache.spark.sql.DataFrame$$anonfun$33.apply(DataFrame.scala:1189
)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.a
pply(RDD.scala:686)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.a
pply(RDD.scala:686)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
35)
 

Here is my code:


public static void main(String[] args) 
       throws UnsupportedEncodingException, FileNotFoundException, IOException 
{
int cpus = 30;		
SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp").setMaster("spark://10.0.0.206:7077").set("es.nodes", "10.0.0.207").set("es.nodes.discovery", "false").set("es.cluster", "es-dev").set("es.scroll.size", "5000").setJars(JavaSparkContext.jarOfClass(Demo.class)).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.default.parallelism", String.valueOf(cpus * 2)).set("spark.logConf", "true");
				 

		SparkContext sparkCtx = new SparkContext(sparkConf);		
		SQLContext sqlContext = new SQLContext(sparkCtx);
	
		DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "rt-logs/test");
		
		df.registerTempTable("Log");
		
		
		DataFrame sqlDFTest = sqlContext.sql("SELECT created, count(instances) FROM Log where id  = 100000 group by instances");

		List<String> results = sqlDFTest.javaRDD().map(new Function<Row, String>(){

			static final long serialVersionUID = 1L;

			@Override
			public String call(Row row) throws Exception {					
				return "Created" + row.getString(0) + " Count " + row.getInt(1);
			}				
		}).collect();
		
		try (Writer writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("result3.txt"), "utf-8"))) {
			
			for (String row : results) {				
				writer.write(row);
			}
			
			writer.write(sqlDFTest.first().toString());
		}
	}

```

Any help please ?

(Philip K. Adetiloye) #2

I found out the problem....basically I was running Spark 1.4.1 while my maven dependency was using 1.4.0.


(Costin Leau) #3

Looks like you have a configuration mismatch between your compile code and your deployment; likely a Spark version mismatch.


(Costin Leau) #4

Doh, just saw your reply ...


(system) #5