Spark tuning for Elasticsearch - how to increase Index/Ingest throughput

Hello everyone,
I would like to know the relation between Spark executors, cores with Elasticsearch batch size and how to tune Spark job optimally to get better index throughput.

I have around 3.5B documents stored in Parquet format and I would like to ingest them to Elasticsearch and I'm not getting more than 20K index rate. Sometimes I got 60K-70K but it comes down immediately and the average I got was around 15K-25K indexes per second.

Little bit more details about my input:

  • Around 22,000 files in Parquet format
  • It contains around 3.2B records (around 3TB in size)
  • Currently running 18 executors (3 executors per node)

Details about my current ES setup:

  • 8 nodes, 1 master and 7 data nodes
  • Instace type: c4.8xlarge
  • Index with 70 shards
  • Index contains 49 fields (none of them are analyzied)
  • No replication
  • "indices.store.throttle.type" : "none"
  • "refresh_interval" : "-1"
  • es.batch.size.bytes: 100M (I tried with 200M also)
  • es.batch.size.entries: 10000 (I tried with different value also)

I tried with different partitions with different combination of # of executors or cores but didn't get enough performance gain.

I'm very new to Elasticsearch so not sure how to tune my Spark job to get better performance. Any guidance will be highly appreciated.

2 Likes

Hi , I'm also facing the same issue here. Any update on the ideal design ?

I had a similar problems and found two big wins.

  1. Set Translog to async. My application is only accepting data from Spark so I'm not worried about losing data so async works. Before setting this I was getting errors like "Elasticsearch is too busy" and queue is full.

    POST /devices
    {
    "index": {
    "refresh_interval" : "-1",
    "translog": {
    "durability": "async",
    "sync_interval": "5s"
    }
    }
    }

  2. Another big win was to dedicate all of your threads to the bulk loading thread pool. Note that query performance will suffer depending on the number of threads you dedicate.

    PUT /_cluster/settings
    {
    "transient": {
    "indices.store.throttle.type": "none"
    },
    "persistent" : {
    "threadpool.bulk.type": "fixed",
    "threadpool.bulk.size": 60,
    "threadpool.bulk.queue_size": 3000,
    "threadpool.generic.keep_alive": "5m"
    }
    }

I staged all of my aggravated data since it's fairly easy to overload your elasticsearch cluster with spark. I would start with 1 executor and scale up from there. My Spark cluster for loading data matches the number of data nodes in my es cluster. And I'm using m3 nodes since I'm just reading Parquet and pushing to ES.

I'm achieving burst up to 500k-800k documents per second with this configuration. The difficult thing is to sustain that level for long periods of time. I believe my average rate is around 200k docs per sec.

spark-submit --deploy-mode cluster --master yarn \
  --num-executors 5 \ 
  --executor-cores 8 \
  --executor-memory 20g \
  --conf spark.es.nodes= \
  --conf spark.es.batch.size.bytes=8mb \
  --conf spark.es.batch.size.entries=0 \
  --packages org.elasticsearch:elasticsearch-spark-20_2.11:5.0.0 \

One last thing. In EMR I am not using setting maximizeResourceAllocation to true because we're trying to control the number of Spark task.