Tune for indexing speed

Hey Everyone,

We're having some issues optimizing the indexing speed on our cluster. Try as we might, we can't go over 200k/s. Here are the things we tried:

  1. Increasing the amount of documents used in a bulk request - the amount we tried was between 500 and 50k. There seems to be no benefit in going over 5k
  2. Increasing the num of threads in kafka - we tried between 5 and 50, there is not benefit in going over the number of shards + replica
  3. Increasing the refresh_interval to 30s
  4. Setting number of replicas to 0 on the index that's being written into
  5. Adding additional nodes kafka outputs to, as well as trying on a single node (the input for kafka is a single LB which does round-robin balancing between an x number of elastic nodes)

The nodes we use as outputs for Kafka are coordinating-only nodes with 6CPUs and 64G of ram. The resource usage on said nodes is well under 10%.

We also tried a bunch of cluster settings including concurrent rebalances, concurrent recoveries, indices recovery max bandwidth, all of which were set to extremely high values for the purpose of testing. All of the above had no impact on the indexing speed. The only reason we tried those was because we were out of ideas.

The resources are not the issue. We have 20 hot nodes, they have 10 CPUs, 64G of ram, 32 of which is dedicated for JVM heap, and they have NVMEs with r/w speeds in the thousands. The network speed is 10G, and at most 2.5G is used between Kafka and the nodes we're sending the data to at any given moment.

We have the number of shards set to 6 plus replica so 12 hot nodes are ingesting in total. The CPU never peaks over 70%, and usual usage is in the 50s. JVM heap usage also never goes over 20G. The I/O for writing tends to be about 200/s on each node.

There are no WARN events in the logs at all, so we're not reaching a point of too many requests to Elastic. Elastic's rmp was installed through yum, so there should be no need to set any ulimit values except unlimited memlock for the systemd service as far as we know.

Does anyone have any tips they can provide which we can try next? The only thing we haven't tried so far was increasing the index buffer size, but we don't really thing that will help as 10% of 32G is more than enough for 2 heavy indexes.

Thanks for any help in advance!

Which version of Elasticsearch are you using?

Are these on 10G networking like the data nodes? How many of these do you have?

Are these local disks attached directly to the nodes? What type of disks are you using?

How many different indices are you actively indexing into?

If you have 20 hot nodes, why not set this to 10 or 20 primary shards with a replica?

What is the average size of your documents?

How many partitions are you reading from? How many concurrent threads do you have writing into Elasticsearch?

Blockquote
Which version of Elasticsearch are you using?

Currently we're using 8.3

Blockquote
Are these on 10G networking like the data nodes? How many of these do you have?

We have two 10G switches and the load is distributed evenly among them.

Blockquote
Are these local disks attached directly to the nodes? What type of disks are you using?

They are virtual disks which are presented to Hypervisors as JBODs

Blockquote
How many different indices are you actively indexing into?
If you have 20 hot nodes, why not set this to 10 or 20 primary shards with a replica?

At the moment two indices only, that will change to more in the future. Because the index size might become too large which will hurt reading speeds. ILM might also have issues when moving data between tiers as it only works with entire indices. With 10 primary shards it would mean the indices would be 1TB.

Blockquote
What is the average size of your documents?

The average size is about 25KB

Blockquote
How many partitions are you reading from? How many concurrent threads do you have writing into Elasticsearch?

I do not quite get what you mean by partitions? The number of concurrent threads is 12 at the moment as there was no benefit in increasing it further. Mind you, the benefit we got from the 12 threads, compared to like 3-5, was also extremely small, probably about 5%

How many coordinating only nodes do you have?

I do not understand this reasoning at all. Can you please elaborate on the use case and sharding strategy?

That is quite large. Are you using nested documents and mappings?

Do you update documents or are they immutable?

A Kafka topic can have one or more partitions. How many topics and partitions are you reading from?

What are you using to index into Elasticsearch? Where does this run (on Kafka nodes or dedicated nodes)?

Blockquote
How many coordinating only nodes do you have?

5 at the moment, but the index rate doesn't change at all between only using one, or all 5. It also doesn't change when we skip the LB and go straight to it.

Blockquote
I do not understand this reasoning at all. Can you please elaborate on the use case and sharding strategy?

We rollover when any primary shard reaches 50GB. Since we would have 20 shards, the total size of the index would be 1TB. ILM moves an entire index through tiers, and it also refreshes na index's age when it moves from being the write index to a rolled over index. See example below:

GET index-2022.11.14.07-000005/_ilm/explain
{
  "indices": {
    "index-2022.11.14.07-000005": {
      ...
      "age": "36.68m",
      ...
    }
  }
}

POST /index/_rollover

GET index-2022.11.14.07-000005/_ilm/explain
{
  "indices": {
    "index-2022.11.14.07-000005": {
      ...
      "age": "4.01s",
      ...
    }
  }
}

Since ILM would need to wait for one shard to reach 50G, the difference in age between all these indices would also be very big. This means that instead of having a flow of data consantly moving through tiers, it would be moved in massive bulks.

Blockquote
That is quite large. Are you using nested documents and mappings?
Do you update documents or are they immutable?

It's netflow so the logs are very large. They are also stripped a lot before entering kafka to reduce their size in general. They are immutable.

Blockquote
A Kafka topic can have one or more partitions. How many topics and partitions are you reading from?
What are you using to index into Elasticsearch? Where does this run (on Kafka nodes or dedicated nodes)?

We are not reading from Kafka, Kafka is ingesting data into Elasticsearch through the Elastic confluent connector. It runs on the Kafka nodes.

Then the confluent connector is reading from Kafka and indexing into Elasticsearch.

The first thing I would recommend you do is to verify that it indeed is Elasticsearch that is the bottleneck. I have on numerous occasions seen users trying to tune Elasticsearch and not being able to identify what is limiting throughput when the bottleneck instead exists in the indexing pipeline.

If you had external processes reading from Kafka on separate hardware I would have recommended removing the Elasticsearch output and instead write to disk to verify that this increases throughput. Since your indexing process is running on the Kafka nodes you will need to take a different approach.

One way could be to set up a few Logstash instances on dedicated nodes and have these read from Kafka over the network and write small messages with a timestamp to file so this does not become a bottleneck. If you do not want to use Logstash you can probably use something else or even create a custom script/application. If you can make this read at a significantly higher rate than you are currently indexing into Elasticsearch we can continue troubleshooting from there.

So far we haven't had much luck. The network does not sesem to be the bottleneck, and neither are the resources. Setting up multiple logstashes with an optimized configuration for maximum reading throughput for kafka would take some time so we'll leave that as the last resort.

We've come across this post: Performance Considerations for Elasticsearch Indexing | Elastic Blog. They talk about index.store.throttle.type, but I can't seem to find that setting anywhere in Elastic's documentation.

Looking through the default cluster settings, there's no mention of it either. Is this a setting that's still worth trying or has it been deprecated in version 8? Additionally, we'll try unsetting the refresh interval entirely and see if that helps (setting it to -1).

I think this is a necessary first step. I have on numerous occasions seen users troubleshooting Elasticsearch without being able to identify any bottleneck just to later realise that the limitation is earlier in the chain. If you are reading 200000 25kB events per second it corresponds to close to 5GB/s. This could easily cause network bottlenecks somewhere in the system, e.g. on the Kafka nodes, or cause congestion on disks.

Before we know that the bottleneck is indeed in Elasticsearch, which there is no clear evidence of as resource usage is low and you have a powerful cluster, there is no point in my opinion to try and tune Elasticsearch.

Me too.

If you are pushing Elasticsearch to its limits then it should be pushing back on the client, for instance rejecting some docs from indexing with a 429 status code. You can also look for evidence of ongoing backpressure in GET _nodes/stats, for instance checking things like (from the top of my head) breakers.*.tripped, indexing_pressure.memory.total.*_rejections and thread_pool.*.rejected. If stats like these are constant, your cluster is handling everything it receives without issue.

2 Likes

Alright, we'll see about setting up a few logstashes to write directly to the disk then. We shouldn't be hitting any limits yet as peak CPU usage on the hot nodes is about 70-75%%.
We were getting 429 errors a few days back, but upping the shard count so the load is distributed more evenly resolved that. I'll take a look into the _nodes/stats api as well.

Thank you for the continued help Christian and David.
Cheers!

1 Like

This is less than ideal. Any kind of storage virtualization will add latency, which can have a dramatic negative impact on performance.

To give you an idea of what bare-metal servers with local attached storage can achieve...

At ElastiFlow we are 100% focused on network flow data (Netflow/IPFIX/sFlow). We extract out a lot of detail from the records, and do a lot of enrichment, so our records can be pretty complex. Even then, the average size of an indexed record is 500 to maybe 600 bytes per record. 25K seems very large, even for very heavily enriched netflow.

When we tested the latest release of the ElastiFlow collector, we were writing to a 5 node ES cluster. Each node had 16-cores, 128GB RAM, local NVMe storage. We were able to ingest at 350K records/sec.

As soon as you start mentioning virtualization and hypervisors you are going to have challenges handling the data rates that you are targeting.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.