Elasticsearch + Spark read performance issues

I have a simple application that reads IDs from ES and returns the number of distinct IDs using Spark.
My configuration is:

  • ES 2.2 (cluster)
    2 clients, 3 master, 4 data (16G RAM, 8G heap)

  • elasticsearch-hadoop-2.2.0

  • Spark 1.6.1 (cluster) & Scala version 2.10.5
    1 master (2 CPU 2.90GHz, 4G RAM) and 3 workers (2 CPU 2.40GHz, 8G RAM)

I gets performance issues with spark. The program runs in the same time (about 1m) with 1, 2 or 3 workers. How can I improve the performance to read from ES and to use all the workers properly?
ES and Spark are on different servers. Is it better to install Spark on ES servers to limit data transportation?

Scala code:

import org.apache.spark._
import org.elasticsearch.spark._

object ElasticsearchTest {

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("elasticTest").set("es.nodes", "").set("spark.executor.memory","6G").set("spark.executor.cores","2").set("spark.driver.cores","2").set("spark.cores.max","6")
    val sc = new SparkContext(sparkConf)
    val query = "{\"fields\":[\"obj_id\"],\"query\":{\"bool\":{\"filter\":{\"bool\":{\"must\":[{\"range\":{\"date\":{\"gte\":\"20150101\",\"lte\":\"20150631\"}}}]}}}}}"

    val rdd = sc.esRDD("myindex/mytype", query)

    val countVal =  rdd.map{case (key, value) => value("obj_id")}

    val docCount = countVal.distinct().count();
    println("Count of distinct objects with query : %s".format(docCount))

Stack trace with 1 worker
Stack trace with 3 workers

1 Like

Yes, running Spark and ES on the same node will try to load data from the local node. So it might be that shuffling the data over the network is the bottleneck (you should monitor that)
Also, how many shards do you have in that index? Spark creates one task per shard, so you should have at least one shard per worker.

Hi Henrik
Thanks for your help. Indeed It seems that my problem comes from data shuffling. My index has 4 shards and the ES query retrieve 300 000 documents (about 1GB of data). I'll install 1 shard per worker to see if it improves the performances. I have found also this topic if it can help someone else.