I am having trouble running my job on my spark Cluster.
Each time I ran the job, I keep getting this error:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task
3.3 in stage 0.0 (TID 12, 10.0.0.207): java.lang.ClassCastException: scala.coll
ection.Iterator$$anon$11 cannot be cast to scala.Tuple2
at org.apache.spark.sql.DataFrame$$anonfun$33.apply(DataFrame.scala:1189
)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.a
pply(RDD.scala:686)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.a
pply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
35)
Here is my code:
public static void main(String[] args)
throws UnsupportedEncodingException, FileNotFoundException, IOException
{
int cpus = 30;
SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp").setMaster("spark://10.0.0.206:7077").set("es.nodes", "10.0.0.207").set("es.nodes.discovery", "false").set("es.cluster", "es-dev").set("es.scroll.size", "5000").setJars(JavaSparkContext.jarOfClass(Demo.class)).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.default.parallelism", String.valueOf(cpus * 2)).set("spark.logConf", "true");
SparkContext sparkCtx = new SparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sparkCtx);
DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "rt-logs/test");
df.registerTempTable("Log");
DataFrame sqlDFTest = sqlContext.sql("SELECT created, count(instances) FROM Log where id = 100000 group by instances");
List<String> results = sqlDFTest.javaRDD().map(new Function<Row, String>(){
static final long serialVersionUID = 1L;
@Override
public String call(Row row) throws Exception {
return "Created" + row.getString(0) + " Count " + row.getInt(1);
}
}).collect();
try (Writer writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("result3.txt"), "utf-8"))) {
for (String row : results) {
writer.write(row);
}
writer.write(sqlDFTest.first().toString());
}
}
```
Any help please ?