Performance of Spark bulk index to Elasticsearch

I have a Spark app which uses Elasticsearch-Hadoop to bulk index logs. My log file has 80 million records. My app takes 3 hours to complete. I am looking for ways to improve my Spark, Elasticsearch-Hadoop, and/or Elasticsearch configuration to improve this time.

Elasticsearch:

  • I am using v5.4.3.
  • I have a 3-node cluster, each node on a separate machine.

Machine specs:

  • centOS 6.7
  • Intel(R) Xeon(R) CPU E5-2450 0 @ 2.10GHz
  • 32 processor
  • 64GB memory
  • 340GB disk space
  • spinning disks

Configuration:

Elasticsearch-Hadoop:

  • I am using v5.4.3.

Configuration:

  • es.batch.size.bytes: 6mb
  • es.batch.size.entries: 6000
  • es.batch.write.refresh: false

Spark:

  • I am using Spark v2.1.0, Scala v2.11.7.
  • I use 21 executors, 1 core per executor. Elasticsearch-Hadoop sends up to 21 batches in parallel.
  • The app uses SparkContext.textFile to read the log file, maps over the RDD to transform each log line into a case class (Elasticsearch document), and then calls EsSpark.saveToEsWithMeta to write the RDD to Elasticsearch.
  • I am generating my own document ids because I want my app to be idempotent.
  • Each document contains 21 fields, and is on average 2kb. None of the fields are analyzed. The _all field is disabled.

Index settings:

  • index.codec: best_compression
  • index.translog.durability: async
  • index.merge.scheduler.max_thread_count: 1
  • index.number_of_replicas: 0
  • index.refresh_interval: -1

Monitoring

  • I am using the X-Pack monitoring plugin and checking the dashboards in Kibana.
  • Based on the dashboards, I'm not approaching memory or CPU limits. I'm not getting throttled by Elasticsearch either. However, my indexing rate seems maxed at 15000 docs/s. If I continue to increase batch size or number of executors, the request rate increases, but the request time also increases, and overall the indexing rate worsens.
  • I am not sure how to see I/O usage via the X-Pack monitoring plugin, but I suspect my problem is that my servers cannot write to disk quickly enough.
  • One interesting thing I noticed was that, using default settings for everything, i got a faster indexing rate with two servers than one (15000 docs/s with two vs 10000 docs/s with one), but a slower rate with three (5000 docs/s). With three servers, I had to increase batch sizes in order to achieve similar rate as with two servers.

The one major recommendation I've heard about, but haven't tried, is getting better hardware (e.g. using SSDs instead of spinning disks). I'd like to first make sure I've done what I can configuration-wise before I invest in better hardware. Any advice given is appreciated. Thank you!

@guest444 have you done any runs or measurements with the number of partitions in the index?

@james.baiera I've tried reducing the shards in the index from 5 to 3, with no noticeable effect. On the Spark side, I've tried decreasing rdd.partitions from 404 to 21 (the number of executors I'm using) via rdd.repartition(21). This has also not had any noticeable effect. My concurrency seems limited by the number of executors- at any given time I have at most 21 tasks running, despite having 404 total tasks.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.