I have created two index named "rtyan1" and "rtyan2",and each index has only one type named "rtyan"
when i query data with sparksql dataframe, it crash because of OOM.
environment:
spark 1.5.1
elasticsearch-hadoop-2.1.2.jar( also i try elasticsearch-spark_2.10-2.2.0-beta1.jar, crash too)
i use spark-shell to do this test,command as :
./spark-shell --master spark://yy75:7077 --jars /opt/jobserver/elasticsearch-spark_2.10-2.2.0-beta1.jar
and the code as :
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
import scala.util.Try
import org.elasticsearch.spark.sql._
val sqlContext = new SQLContext(sc)
val tablename ="rtyan";
val querySql="select count(*) FROM rtyan limit 5";
//register table to sqlContext
val options = Map("pushdown" -> "true", "es.port" -> "9200")
val df=sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("rtyan1,rtyan2/rtyan")
df.registerTempTable(tablename)
//run sql statement with sqlContext
val ret=sqlContext.sql(querySql)
ret.show()
when i execute above code with spark-shell,i crash (OOM),and display the following error message:
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.elasticsearch.hadoop.rest.ShardSorter$PowerSet$1.get(ShardSorter.java:232)
at org.elasticsearch.hadoop.rest.ShardSorter$PowerSet$1.get(ShardSorter.java:229)
at org.elasticsearch.hadoop.rest.ShardSorter$ReverseIndexedListIterator.next(ShardSorter.java:339)
at java.util.AbstractCollection.toArray(AbstractCollection.java:141)
at java.util.ArrayList.<init>(ArrayList.java:164)
at org.elasticsearch.hadoop.rest.ShardSorter.powerList(ShardSorter.java:202)
at org.elasticsearch.hadoop.rest.ShardSorter.checkCombo(ShardSorter.java:89)
at org.elasticsearch.hadoop.rest.ShardSorter.find(ShardSorter.java:85)
at org.elasticsearch.hadoop.rest.RestRepository.doGetReadTargetShards(RestRepository.java:347)
at org.elasticsearch.hadoop.rest.RestRepository.getReadTargetShards(RestRepository.java:290)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:252)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:61)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:60)
at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:27)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.getPartitions(MapPartitionsWithPreparationRDD.scala:40)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
but when i change code :
val df=sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("rtyan1,rtyan2/rtyan")
to
val df=sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("rtyan1/rtyan")
it is ok,no crash and data are returned normally
i also try to crate alias(rtyanalias) for rtyan1 and rtyan2,and use code:
val df=sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("rtyanalias/rtyan")
it also crash (OOM)
so i think es-hadoop only support mapping one dataframe to on index,but in our case,we want a dataframe map to one alias and this alias point to multiple index(one index for a day)
any solution?
hope get your reply,thank you very much