Hi,
my Spark Job hangs while I am trying to compute a union of 2 RDDs after the aggregateByKey operation.
I have the following index:
{ "test_index": { "aliases": {}, "mappings": { "keyValue": { "properties": { "key": { "type": "string", "index": "not_analyzed" }, "value": { "type": "double" } } } }, "settings": { "index": { "creation_date": "1469795011108", "number_of_shards": "5", "number_of_replicas": "1", "uuid": "6HIbbHcLSv6pctgbh1mt9A", "version": { "created": "2030299" } } }, "warmers": {} } }
and the following code that hangs on computing a union of 2 RDDs ONLY after the aggregationByKey operation:
`val configuration = new SparkConf()
.setAppName("ES Spark Test Application")
.setMaster("local[4]")
val sparkContext = new SparkContext(configuration)
val firstEsRDD = sparkContext.esRDD("test_index/keyValue")
.map { case (id, data) => (data("key").asInstanceOf[String], 1) }
val secondEsRDD = sparkContext.esRDD("test_index/keyValue")
.map { case (id, data) => (data("key").asInstanceOf[String], 1) }
secondEsRDD.union(firstEsRDD).collect().foreach(println) // works
secondEsRDD.aggregateByKey(0)(_ + _, _ + ).collect().foreach(println) // works
secondEsRDD.aggregateByKey(0)( + _, _ + _).union(firstEsRDD).collect().foreach(println) // hangs
val firstRegularRDD = sparkContext.parallelize(Array(("a", 1), ("a", 2), ("b", 2)))
val secondRegularRDD = sparkContext.parallelize(Array(("a", 1), ("a", 2), ("b", 2)))
firstRegularRDD.union(secondRegularRDD).collect().foreach(println) // works
firstRegularRDD.aggregateByKey(0)(_ + _, _ + ).collect().foreach(println) // works
firstRegularRDD.aggregateByKey(0)( + _, _ + _).union(secondRegularRDD).collect().foreach(println) // works`
Spark version is 1.6.1. ES-Hadoop version is 2.3.1.
Any suggestions on what I am doing wrong are more than welcome.