Stress testing ES-Hadoop

We want to load a big ES index to Apache Spark using ES-Hadoop connector but the problem with this
is that the index is very big and fails to load the index inside Spark.

In most cases, what we see is the executor task takes so much time doing garbage collection and no cache
is created in memory, which is the main point of loading all our data in spark.

Here is what our configuration looks like:

ES Index:

Index: sales-rpts8-201501	
Docs: 22,951,540
Size:	114.4GB

Spark Job

...
.setAppName("BigIndexJob")
					.setMaster(SPARK_MASTER)
					.set("es.nodes", ES_NODES)
					.set("es.nodes.discovery", "false")
					.set("es.cluster", ES_CLUSTER)
					.set("es.scroll.size", ES_SCROLL_SIZE)				
					.set("spark.logConf", "true")
					.set("spark.driver.memory", "5g")
					.set("spark.executor.memory", "30g")
					.set("spark.default.parallelism", "60")
					.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Spark cluster (Standalone)

Cores: 128 Total, 128 Used
Memory: 499.5 GB Total, 120.0 GB Used
Applications: 1 Running, 12 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE

Any idea what we doing wrong ?

Can I ask how you are benchmarking your application?

What's the error message? Are you sure you are allocating enough resources/memory to Spark ? Is it the connector or Spark that break down with OOM?

@costin We bumped up the executor memory and now it works on a standalone cluster, it took almost 90mins to load all the data though.

Now, am running it on Yarn to test and am getting a different error message

ge 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 10,
mavencode.ca): java.lang.IllegalArgumentException: Size exceeds In
teger.MAX_VALUE
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
        at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStor
e.scala:125)
        at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStor
e.scala:113)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
        at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5
11)
        at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429
)
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
35)

Still pulling my hairs to figure out how to fix this

Good luck :wink:
I recommend reaching out the Spark mailing list. Do note that while Yarn is an option, there are other deployment modes that are easier to use.
As for the load time, 90 minutes is a lot for a hundred gig - have you done any monitoring to see what slows things down?

Thanks :smiley:
What other deployment options apart from - standalone, mesos and yarn that are easier to use?
We've tried standalone but we using Yarn now because it would help us utilize our cluster resources efficiently.

On Elasticsearch, yes - we use kpof and marvel.
But spark, we don't use any monitoring tool but it would be nice to get to know what's going on when the data is loading...and taking so much time.
Which tool would you recommend to do the monitoring for spark?

It's best to seek answers to these questions on the Spark mailing list / forum.
I hear good things about mesos - there's this perception that is not as mature however it does come with some nice tools for managing and monitoring even though it's fairly young. So if you have time/bandwidth try playing around with it.
Of course there's also the Databricks offering (namely Spark in the cloud).

I'm not aware of any proper tool to monitor Spark - outside Databricks itself which is not fully available (as far as I know).