Spark Bulk Import Performance Benchmarks

I've been using the es hadoop package for Spark over the past 6 months and I've never been able to sustain a bulk indexing rate over 70k doc per second.

Six months ago with Spark 1.6 and ES 2.4 we had an indexing rate from 10k to 20k. This cluster has dedicated master and client nodes(m3.large) with 6 data nodes ( c3.2xlarge ). This marvel graph shows our indexing rate from one of our test.

Since then we have updated to emr-5.2.0 with Spark 2.0 and es-hadoop to 5.1.1 as well as Elasticsearch 5.2.0. However we have discovered a glass ceiling of ~70k docs indexed per second.

Many of the bulk loading articles state that you need to start off with one node, shard and Spark partition to understand your base line performance. We ran over 30 test and discovered our optimal rate is ~73k per sec with two nodes. Adding nodes simply decreased performance. Here are those test.

So our next test would use a cluster with dedicated master, ingest and data nodes. We're using i2.2xlarge for all of them for the purpose of this test.

During bulk loading the 5 ingest nodes show relatively high spikes in CPU but the 15 minute load average is around 1.5. The data nodes have a higher level of cpu usage 60-80%. I'm also monitoring I/O wait times with sysstat sar and I never see i/o waiting time.

So the bottleneck isn't disk I/O and appears to be CPU. Separating ingest from data nodes helps scale these out independently and it's looking like we need more ingest than data nodes.

However, every time we scale the cluster and attempt to push more data over from Spark we end up getting the same indexing rate.

Other notes:

  • We ensure shards are evenly distributed across the nodes
  • We have tested the number of shards per node but that has very little effect on performance
  • We have been controlling Spark parallelism with dataFrame repartitioning. I believe that is working well.
  • We verified that I/O on the Spark side was not an issue.
  • On the Spark side we are running 10x r3.2xlarge nodes
  • I recently ran a cluster with 11 Spark nodes, 5x ingest, 2x master and 10x data nodes with the same results.

So I'm curious if other people are getting better indexing performance than us?

@jspooner What sort of workloads are you performing on the ingest nodes? Generally the usage of ingest nodes is helpful in creating a unified ingestion gateway into Elasticsearch (especially if you are sourcing data from many places), but if you are primarily inserting data only from Spark, i'm wondering if it makes sense to transfer the ingest logic to Spark and aim the resulting documents directly toward the data nodes in the cluster? Hypothetically this could lead to a boost in throughput by enriching the data where it already stands (in Spark instead of shipping to an ingest node) and then sending it to nodes that have a chance to consume the document immediately and skip the second hop to route it to the appropriate primary shard (which can save network IO overhead).

PS. I absolutely must compliment you on your impressive statistics collection regarding your performance tuning. Nice!

@james.baiera Our data is only coming from Spark and the elastic cluster is transient with no search load.

In 2.4 we used several client nodes to handle http requests. However in 5.2 "client nodes" are no longer listed as a node type so we swapped out the client nodes for ingest. Actually the first test I showed was not using ingest nodes and I was seeing lower indexing rate when scaling the number of nodes up.

I'm about to run some more jobs right now so I'll try removing the ingest nodes again. Maybe I just need to scale the elastic cluster way way up. And I just started using the Shrink API so we can have a shard count to match the number of nodes. I'll post the results of my next load.

And thanks for the compliment but I felt a little crazy when I would add more ES nodes and get a slower indexing rate!

Client nodes as an explicit node type have gone away, but they are still around in the sense of nodes that have "master", "data" and "ingest" features turned off. Every node in Elasticsearch is technically a client node, it's just that when we have the option to target client nodes only, we search for nodes that have no roles in 5.x.

That said, if your cluster is transient with no search load, it might make sense to zero in on the default node targeting, which is directly to datanodes. Would you be able to share your job configurations and cluster layout/index settings here? Writing explicitly to datanodes can sometimes be less advantageous when using more complex settings (like skewed shard/node sizes, or multi-index writing).

@james.baiera

Here is a link to my es cluster configuration, mappings and some spark files.

  • Spark Cluster is normally 10x r3.2xlarge running emr-5.2.0.
  • Indexing rate is viewed in Kibana.
  • I also pull start and end times from kibana to calculate the over all indexing rate for the bulk load.

I've tried three different cluster configurations
1. ES 2.4

  • 3 Master
  • 3 Client
  • 9 Data nodes
  • Indexing rate ~20/s

2. ES 5.0

  • i2.2xlarge
  • 1 Master
  • 1 Data
  • Indexing rate ~65k per second

3. ES 5.0.0

  • i2.2xlarge
  • 1 Master
  • 5 ingest
  • 1 up to 15 Data Nodes
  • Indexing rate ~70k per second

4. ES 5.2.1

  • i2.2xlarge
  • 1 Master
  • 20 Data
  • Indexing rate ~50k per second
  • And these stats can been seen here.

Another test pushing partitions up to 400 results in a slower indexing rate. I'm fairly confident I can have a 100 node cluster and my indexing rate would be the same.

@james.baiera I also ensure there is no skew in the Spark Partitions. I can see that by running df.rdd.mapPartitions(iter => Array(iter.size).iterator, true).take(df.rdd.partitions.length)

@james.baiera I finished a full day of loading into a 20 node cluster with the same indexing rate as a 2 node cluster. What I can say is that with 20 nodes elasticsearch never crashes.

@jspooner Just checking in to see if you've made any further progress beyond your previous posts. My main piece of advice would be to attach a profiling tool like VisualVM to the running Spark tasks and analyze the CPU usage snapshots to see if there are any spots in the connector that are causing a bottleneck in your ingestion. Looking at the last post, you're seeing quite a lot of rejections from the bulk thread pool. If you find that the connector is mostly waiting on bulk requests, then there might be some settings on the Elasticsearch side that need tuning.

@james.baiera no we haven't been able to improve the situation yet.

  • On the Spark side I see very little CPU because my job is simply readying Parquet and pushing to ES.
  • The bulk error rate is increased when I push over more data either by adding partitions or sending more bytes per request.

We will be loading more data this week so I'll post if we discover anything new.

Interesting. My intuition would say increase the batch size (es.batch.size.bytes & es.batch.size.entries). It looks like the bottleneck is in Elasticsearch itself, because of the rejected bulks. These tips might also help: https://www.elastic.co/guide/en/elasticsearch/reference/5.2/tune-for-indexing-speed.html

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.