High CPU Usage on a few data nodes / Hotspotting of data

Hi team, we have an Elastic Search Cluster with the following configurations

  • ES Version 7.17.0
  • 60 data nodes cluster
  • 50 primary shards and
  • 2 replica for each primary shards

Here, we are using a particular custom routing key with routing_partition_size = 5, meaning data for the same key can be spread across 5 different shards.

The issue is that my routing key is skewed and it’s not possible for me to change the routing key. This skewness is causing 5 primary shards, therefore roughly 15 out of 60 nodes, to have 95% CPU usage during periods of high writes.

On deep diving, we found that the high writes is leading to a lot of segment merge, and the CPU spikes match with those of the segment merge graph.

To reduce the segment merge, we have also tried tweaking the below properties, and that is not working as well

index.merge.policy.segments_per_tier
index.merge.scheduler.max_thread_count.

The machines we are using are of top notch hardware and IOPS is also not a bottleneck.

We cannot reduce the writes and cannot change the routing key logic, is there any way we can solve this problem?

It sounds like you are using one or more custom indices and not time-based indices. Is that correct?

Is the write load primarily new writes or a significant postion of updates and/or deletes? If you are doing updates, how are these done? Are some specific subset of documents frequently updated?

Have you looked into the write load distribution and determined how many different routing keys that are causing this issue? Is the distribution across these uniform or do you have a few specific routing keys that dominate?

You don't mention the benefit you get from the custom routing, just the cost? Is it not worth reviewing the design choice? Actually, let's be straight - if you were designing the solution today, knowing what you know now, would you use the same approach?

Also, obviously I dont know how long you have had this solution in place, but was this a problem last week? Last month? If not, what has changed to make it a pressing problem now? Is the key more skewed now than it was a few weeks ago? If so, why? And will it get progressively more skewed going forward?

1 Like
  1. 95% of the new writes are updates.
  2. The document contains multiple fields and some of the fields are updated in every write call.
  3. We have a custom routing field comprising of a single field….this field has 1,315,608 unique values out of which 5 of them are highly skewed.

Are these 5 hot IDs spread across different shards? If they are overlapping, how many shards are to be considered hot?

Do you have an application layer in front of Elasticsearch handling updates and writes? If so, might it be an option to separate these 5 out into a separate index with a number of primary shards without routring in order to spread the load more evenly?

Something will need to change in order tp solve the problem as I do not believe there is any magic solution. You could try to reroute shards to get a better balance, but if the number of hot shards is small it will only get you so far.

We have a very large scale and cluster, changing this custom routing key is not feasible and moreover, we have a lot of use cases and query patterns based on this routing key.

We are facing this issue since the past 4 years, but with growing data and the data getting more skewed for a few routing ids, the issue is now becoming a bit of a cumbersome and our reads/writes are taking a hit.

These 5 IDs are spread across 5 shards, and since we have 2 replace of each primary, thereby a total of 15 shards are becoming hot shards. This is causing 15 data nodes out of the 63 data nodes to reach 90-95% cpu spike during periods of high writes (high segment merge count).

We manually rerouted the shards to get a better balance, but that is not giving any major goodness…5-6% hardly.

Thanks for clear answer.

Let me remind this is your choice, it's not the out of the box behavior. And I'm not criticizing that, 4 years is a very decent span, but by your own report circumstances have changed, no?

IMHO you can't tweak or tune yourself out of the design choice. If in short term situation is going to get more skewed, or as a result of more data, then any small% improvement you make here or there is lost within same period of time.

I agree. So in a sense you are now faced with a choice, actually change something, or try to live/manage the current, worsening situation. I cant answer that, but ask yourself how much worse would it need to get before you considered changing something?

You have 50 prim shards + 2x replicas, so 150 shards in total. And 60 data nodes. 15 problematic shards. So with this topology 15 data nodes can have one and only one shard, one of the problematic ones, and the other 135 shards can live on the other 45 nodes (which mean 3 shards / node). This placement would require significant manual effort to achieve, but end result is that your 5 skewed keys would effectively have a 15 node dedicated sub-cluster.

1 Like

I think it will be very difficult to improve the situation without making any major changes. The only think I can think of apart from what I have already mentioned is to check whether you have individual documents that are frequently updated. If this is the case and you are sending multiple update requests within a short timeframe it might help some if you could squash these into a single larger update as frequent updates to individual documents tend to add a lot of overhead and increase merging activity.

Thanks for the second suggestion, but we can’t do that either due to the scale on which we are working + our write rate is 10K+.

We are already doing that :confused: .

Is there any way we can throttle or do something about the segment merge process which is causing all this in the first place?

Also is it possible to connect via discord or some google meet to discuss this issue better? We have been trying to solve this problem since the past 3 years :confused:

Due to the growing scale, we might end up having more than 5 skewed data points in a couple of months, wherein managing an entirely dedicated sub-cluster for those skewed data points will be highly resource intensive.

I mean, you can send me a PM, and I would be delighted to discuss further, for my normal (and very reasonable) hourly rate :slight_smile:

For free, let's keep it here as it is an interesting problem that others might learn from.

So, that reinforces the core point. How much worse will it need to get before you consider changing something more basic?

You've done the right thing, asked a well constructed and detailed question on the forum. You are hoping for a magic bullet. I don't think there is one. You've played with some settings already, and either tried already or rejected the few things I or @Christian_Dahlqvist suggested.

EDIT: We're taking your word for it here that there is a CPU issue. It is very easy to mis-classify a CPU issue when actually you have an IO issue. You wrote that "IOPS is also not a bottleneck", but be careful there - it cannot be unlimited. If all your threads are stuck waiting on IO, you might see a high load average, but the underlying issue is the IO caused by the segment merges. You should be able to see this with tools like iostat (assuming Linux) and checking %util, await, ... Or look at hot_threads on a node while under stress.

2 Likes

Thanks alot for the reply, yes, I tried running the hot threads command to get the data dump…here are my findings.

100.0% [cpu=92.2%, other=7.8%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[10.67.34.22][[nfr_index_final_2][2]: Lucene Merge Thread #121]'
     5/10 snapshots sharing following 17 elements
       app//org.apache.lucene.store.DataOutput.writeLong(DataOutput.java:213)
       app//org.apache.lucene.codecs.lucene84.ForUtil.encode(ForUtil.java:291)
       app//org.apache.lucene.codecs.lucene84.PForUtil.encode(PForUtil.java:93)
       app//org.apache.lucene.codecs.lucene84.Lucene84PostingsWriter.addPosition(Lucene84PostingsWriter.java:310)
       app//org.apache.lucene.codecs.PushPostingsWriterBase.writeTerm(PushPostingsWriterBase.java:161)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter$TermsWriter.write(BlockTreeTermsWriter.java:907)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter.write(BlockTreeTermsWriter.java:318)
       app//org.apache.lucene.codecs.FieldsConsumer.merge(FieldsConsumer.java:105)
       app//org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:197)
       app//org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:244)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:139)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4757)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4361)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:5920)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:626)
       app//org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:684)
     5/10 snapshots sharing following 13 elements
       app//org.apache.lucene.codecs.PushPostingsWriterBase.writeTerm(PushPostingsWriterBase.java:161)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter$TermsWriter.write(BlockTreeTermsWriter.java:907)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter.write(BlockTreeTermsWriter.java:318)
       app//org.apache.lucene.codecs.FieldsConsumer.merge(FieldsConsumer.java:105)
       app//org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:197)
       app//org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:244)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:139)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4757)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4361)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:5920)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:626)
       app//org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:684)
 
::: {10.69.163.155}{mwhwvUBeQcO4kW_9a-TtOw}{dzj7WKxgSSa_Ffqw7gBBvQ}{10.69.163.155}{10.69.163.155:9300}{cdfhirstw}{xpack.installed=true, transform.node=true}
   Hot threads at 2025-07-24T08:42:35.590Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:
    
   100.0% [cpu=86.3%, other=13.7%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[10.69.163.155][[nfr_index_final_2][4]: Lucene Merge Thread #157]'
     2/10 snapshots sharing following 13 elements
       app//org.apache.lucene.codecs.PushPostingsWriterBase.writeTerm(PushPostingsWriterBase.java:146)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter$TermsWriter.write(BlockTreeTermsWriter.java:907)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter.write(BlockTreeTermsWriter.java:318)
       app//org.apache.lucene.codecs.FieldsConsumer.merge(FieldsConsumer.java:105)
       app//org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:197)
       app//org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:244)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:139)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4757)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4361)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:5920)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:626)
       app//org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:684)
     4/10 snapshots sharing following 13 elements
       app//org.apache.lucene.codecs.PushPostingsWriterBase.writeTerm(PushPostingsWriterBase.java:161)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter$TermsWriter.write(BlockTreeTermsWriter.java:907)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter.write(BlockTreeTermsWriter.java:318)
       app//org.apache.lucene.codecs.FieldsConsumer.merge(FieldsConsumer.java:105)
       app//org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:197)
       app//org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:244)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:139)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4757)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4361)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:5920)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:626)
       app//org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:684)
     2/10 snapshots sharing following 14 elements
       app//org.apache.lucene.index.MappingMultiPostingsEnum.nextDoc(MappingMultiPostingsEnum.java:103)
       app//org.apache.lucene.codecs.PushPostingsWriterBase.writeTerm(PushPostingsWriterBase.java:133)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter$TermsWriter.write(BlockTreeTermsWriter.java:907)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter.write(BlockTreeTermsWriter.java:318)
       app//org.apache.lucene.codecs.FieldsConsumer.merge(FieldsConsumer.java:105)
       app//org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:197)
       app//org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:244)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:139)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4757)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4361)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:5920)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:626)
       app//org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:684)
     2/10 snapshots sharing following 12 elements
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter$TermsWriter.write(BlockTreeTermsWriter.java:907)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter.write(BlockTreeTermsWriter.java:318)
       app//org.apache.lucene.codecs.FieldsConsumer.merge(FieldsConsumer.java:105)
       app//org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:197)
       app//org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:244)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:139)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4757)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4361)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:5920)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:626)
       app//org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:684)
    
   100.0% [cpu=85.6%, other=14.4%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[10.69.163.155][write][T#19]'
     2/10 snapshots sharing following 32 elements
       app//org.apache.lucene.codecs.lucene84.Lucene84PostingsWriter.startDoc(Lucene84PostingsWriter.java:223)

100.0% [cpu=92.2%, other=7.8%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[10.67.34.22][[nfr_index_final_2][2]: Lucene Merge Thread #121]'

5/10 snapshots sharing following 17 elements

100.0% [cpu=86.3%, other=13.7%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[10.69.163.155][[nfr_index_final_2][4]: Lucene Merge Thread #157]'

2/10 snapshots sharing following 13 elements

app//org.apache.lucene.codecs.PushPostingsWriterBase.writeTerm(PushPostingsWriterBase.java:146)

Further, I’ll also list a few more things which we have already done:

  1. Manually allocating shards for the most optimised results
  2. Adding co-ordinator nodes
  3. Batching the results before writing to Elastic
  4. Tried increasing number of cores from 20 cores per machine to 32 cores per machine.
  5. Added a de-duping layer to reduce writes by 90%.

Even after all these approaches, our nodes touch 95% cpu at times.

Also I have 2 side questions related to elastic which we weren’t able to resolve:

  1. The co-ordinator nodes only experience a 5% cpu spike, even though the data nodes used to experience a 20% cpu spike (when we didn’t have any co-ordinator node) when we used to fire a read aggregation query.
  2. When we tried to re-index the data to a new index and increase the routing-partion from 5 to 8, all the data for a single custom routing key instead of going to 8 different shards, went to a single shard.

Can you please help with these 2 queries as well? I’ll be really grateful :smiley:

Thats impressive. Well done.

Please check the IO side. And share some details on the IO side, describe how it is setup and what you are using for storage.

Can you please also help with the above 2 additional queries

  1. The co-ordinator nodes only experience a 5% cpu spike, even though the data nodes used to experience a 20% cpu spike (when we didn’t have any co-ordinator node) when we used to fire a read aggregation query.

  2. When we tried to re-index the data to a new index and increase the routing-partion from 5 to 8, all the data for a single custom routing key instead of going to 8 different shards, went to a single shard.