Crash when es-hadoop access multiple index with dataframe


(jicaiyan) #1

I have created two index named "rtyan1" and "rtyan2",and each index has only one type named "rtyan"
when i query data with sparksql dataframe, it crash because of OOM.

environment:
spark 1.5.1
elasticsearch-hadoop-2.1.2.jar( also i try elasticsearch-spark_2.10-2.2.0-beta1.jar, crash too)

i use spark-shell to do this test,command as :
./spark-shell --master spark://yy75:7077 --jars /opt/jobserver/elasticsearch-spark_2.10-2.2.0-beta1.jar

and the code as :

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}
import scala.util.Try
import org.elasticsearch.spark.sql._

val sqlContext = new SQLContext(sc)
val tablename ="rtyan";
val querySql="select count(*) FROM rtyan limit 5";

//register table to sqlContext
val options = Map("pushdown" -> "true", "es.port" -> "9200")
val df=sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("rtyan1,rtyan2/rtyan")
df.registerTempTable(tablename)

//run sql statement with sqlContext
val ret=sqlContext.sql(querySql)
ret.show()

when i execute above code with spark-shell,i crash (OOM),and display the following error message:

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at org.elasticsearch.hadoop.rest.ShardSorter$PowerSet$1.get(ShardSorter.java:232)
    at org.elasticsearch.hadoop.rest.ShardSorter$PowerSet$1.get(ShardSorter.java:229)
    at org.elasticsearch.hadoop.rest.ShardSorter$ReverseIndexedListIterator.next(ShardSorter.java:339)
    at java.util.AbstractCollection.toArray(AbstractCollection.java:141)
    at java.util.ArrayList.<init>(ArrayList.java:164)
    at org.elasticsearch.hadoop.rest.ShardSorter.powerList(ShardSorter.java:202)
    at org.elasticsearch.hadoop.rest.ShardSorter.checkCombo(ShardSorter.java:89)
    at org.elasticsearch.hadoop.rest.ShardSorter.find(ShardSorter.java:85)
    at org.elasticsearch.hadoop.rest.RestRepository.doGetReadTargetShards(RestRepository.java:347)
    at org.elasticsearch.hadoop.rest.RestRepository.getReadTargetShards(RestRepository.java:290)
    at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:252)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:61)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:60)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:27)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.getPartitions(MapPartitionsWithPreparationRDD.scala:40)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)

but when i change code :

val df=sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("rtyan1,rtyan2/rtyan")

to

val df=sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("rtyan1/rtyan")

it is ok,no crash and data are returned normally

i also try to crate alias(rtyanalias) for rtyan1 and rtyan2,and use code:

val df=sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("rtyanalias/rtyan")

it also crash (OOM)

so i think es-hadoop only support mapping one dataframe to on index,but in our case,we want a dataframe map to one alias and this alias point to multiple index(one index for a day)
any solution?

hope get your reply,thank you very much


(jicaiyan) #2

more information:
i use ElasticSearch 1.7.3

any one know the reason of this issue?
and any solution?
i have spent two days try to figure it out...


(Costin Leau) #3

That looks like a bug - basically the algorithm for finding out conflicts regarding any shard overlap are blowing up the memory.
What do your indices look like? How many shards there are in each one?

Can you post the relevant cat shards?


(jicaiyan) #4

hi,Costin Leau

sorry to reply so late,i saw your email tonight
i create index rtyan1 and rtyan2 using default parameters,i guess the shards for each index is 5?
and each index has only several records
i will send you detail information for these two indices tomorrow when i go back to my office

thank you again


(jicaiyan) #5

relevant information for index rtyan1 and rtyan2 as following:
command:
curl localhost:9200/_cat/shards/rtyan1
output:
rtyan1 2 p STARTED 0 144b 10.xx.xx.60 analysis60
rtyan1 0 p STARTED 2 4.2kb 10.xx.xx.75 analysis11
rtyan1 3 p STARTED 0 144b 10.xx.xx.67 analysis1
rtyan1 1 p STARTED 1 4kb 10.xx.xx.113 analysis113
rtyan1 4 p STARTED 0 144b 10.xx.xx.58 analysis58

command :
curl localhost:9200/_cat/shards/rtyan2
output:
rtyan2 2 p STARTED 0 144b 10.xx.xx.65 analysis10
rtyan2 0 p STARTED 1 4kb 10.xx.xx.86 analysis86
rtyan2 3 p STARTED 1 4kb 10.xx.xx.59 analysis59
rtyan2 1 p STARTED 1 4kb 10.xx.xx.114 analysis114
rtyan2 4 p STARTED 0 144b 10.xx.xx.116 analysis116


(Costin Leau) #6

I've raised an issue to track it down. Can you please follow that issue (in case more information is needed)?

Cheers,


(jicaiyan) #7

OK,i have create account on github
i will track this issue down
thank you again


(system) #8