Elasticsearch 7.17.10 indexing bottleneck on i3.2xlarge and d3.2xlarge nodes in EKS

My 7.17.10 cluster is hosted in AWS EKS and is managed by ECK. It appears to top out at around 90k documents indexed per second (including replicas) per second and I haven't been able to identify the bottleneck. Adding more hot-tier nodes hasn't improved the indexing rate, and doubling the number of clients sending bulk index requests also hasn't meaningfully changed it. During peak loads I sometimes see 35k/s ingest, which exceeds the 90k bottleneck since there are 2 replicas.

Cluster sizing:

  • 12 hot i3.2xlarge, 7.8 cpu, 55gi mem, 35g heap (experimenting with >50% for heap, which seemed to help a little)
  • 5 warm d3.2xlarge, 7 cpu, 60 gi mem, 30g heap
  • 22 cold d3.2xlarge, 7 cpu, 60 gi mem, 30g heap
  • 3 master as pods w/ 4 cpu, 28gi mem, 22gi heap

This is a multi-tenant cluster covering hundreds of accounts (es+kibana are not customer-facing, only used for storage and searching through the api). Each customer account has two bootstrap indices for different types of storage, with an ILM policy for warm/cold/delete. The current ILM policy has rotation after 40g primary shard size or 7d, then warm for 6d (with force merge + RO), then cold for 88 before deletion. We currently have ~8.8k primary shards, with each tier staying under 20 shards per GB of heap (much lower than that for the hot tier). Storage is 194tb.

There are some other notes in this post about the initial setup.

During peak load a backlog forms, though the write queue doesn't exceed 150. Write timeouts occur infrequently, and refresh utilization climbs.

Things I've tried without seeing much of a change:

  • add metrics to the steps before elasticsearch to verify that the issue is either within elasticsearch or at least from the point it is sent to the java rest high level client
  • increase the hot tier by 50%
  • double the number of client nodes sending batches
  • double the bulk size, and halve the bulk size
  • increase refresh rate from 30s to 45s on one set of indices, and to 90s on a less important set
  • increase primary shard count for the most active indices to spread the write load around more
  • enable compression for client connections in case ec2 bandwidth throttling occurred (I don't see evidence of this though)
  • increase maxConnTotal and maxConnPerRoute for the httpClientBuilder
  • try increasing indices.memory.index_buffer_size

I'm not sure if this is cpu, disk, network, or cluster coordination costs. Increasing the hot nodes didn't appear to improve the ingest rate, which suggests that cpu/disk aren't the bottleneck, though the hot nodes do run high. Moving the hot tier from i3.2xlarge to m-series for more cores might work, but using EBS would introduce latency and cut my IOPS significantly.

The only warning logs I'm seeing are infrequent slow transport message logs. In the past 24h I've only seen them from hot-8. Samples:

From hot-8 to hot-2

sending transport message [Response{25611586}{false}{false}{false}{class org.elasticsearch.action.bulk.BulkShardResponse}] of size [20723] on [Netty4TcpChannel{localAddress=/172.16.4.49:9300, remoteAddress=/172.16.21.255:51360, profile=default}] took [5005ms] which is above the warn threshold of [5000ms] with success [true]

From hot-8 to hot-1

sending transport message [Request{indices:admin/seq_no/retention_lease_background_sync[r]}{63707977}{false}{false}{false}] of size [586] on [Netty4TcpChannel{localAddress=/172.16.4.49:50176, remoteAddress=172.16.21.17/172.16.21.17:9300, profile=default}] took [5005ms] which is above the warn threshold of [5000ms] with success [true]

sending transport message [Request{indices:admin/seq_no/global_checkpoint_sync[r]}{63707946}{false}{false}{false}] of size [387] on [Netty4TcpChannel{localAddress=/172.16.4.49:34570, remoteAddress=172.16.12.17/172.16.12.17:9300, profile=default}] took [5205ms] which is above the warn threshold of [5000ms] with success [true]

sending transport message [Request{indices:data/write/bulk[s][r]}{63671760}{false}{false}{false}] of size [1192199] on [Netty4TcpChannel{localAddress=/172.16.4.49:50068, remoteAddress=172.16.21.17/172.16.21.17:9300, profile=default}] took [5208ms] which is above the warn threshold of [5000ms] with success [true]

Any recommendations on troubleshooting this further?

How many indices are you actively indexing into? How many primary and replica shards do these indices have?

How are you indexing into Elasticsearch? What is the current bulk size? What is the average size of the documents you are indexing?

What does the distribution of tenants look like with respect to traffic volumes?

Are you using dynamic mappings? If so, do the size of mappings differ a lot between tenants?

When increasing the number of client threads writing to Elasticsearch, have you tried grouping tenants so that each bulk request is more targeted and only cover a few tenants and indices?

As well as Christian's questions:

  1. Are all the write threads busy constantly (or equivalently, is there always a nonempty queue for the write pool) on every hot node? If not, you have unused indexing capacity.

  2. Do you see data building up in TCP buffers (according to something like netstat or ss)? Is it building up on the sender or receiver? Any zero-window acks? Those sending transport message warnings are consistent with a network bottleneck, although that's not the only explanation.

About 440 indices being written to, though some are much busier than others. Almost all of them have a single primary shard, 20 of them have 2 primary shards, and one has 5 primary shards (it is responsible for up to 7k/s by itself). All have 2 replicas.

There are 6 clients writing to it using the java rest high level api. The RestClient has compression enabled, maxConnTotal 200, maxConnPerRoute 100. I've split my ingest into two streams so I can experiment with the bulk size easier, with one having ~400 documents per request, the other having ~1000. The payload size varies since the data comes from many sources, but 2.2-2.5 MiB is the most common.

The average time to bulk index is 575ms for the batches w/ ~400 documents, and 1340ms for the batches w/ ~1000. So 2.5x more documents takes 2.3x as long, which isn't quite linear but is close enough. I had started w/ batches of 500, then tries 200, 400, 1000, 1200 without seeing a major change in throughput. It's always possible that I addressed one bottleneck, shifting it to another area though.

Varies by quite a lot. Some customers contribute under 100 documents per second, one contributes up to 7k per second.

I have an index template defined that should map all of the fields for these indices. It's possible that I missed one - is there an easy way to check?

That was my early design actually, but moved to fully random. The pipeline comes from an on-prem collector that sends through AWS Kinesis, with 6 instances of the processor using the KCL library to assign Kinesis shards to each instance. I started with N buckets by account to reduce the number of accounts that would be in each shard, but Kinesis utilization was poor even with 17 buckets per account, so I reverted to fully random. Moving to fully random improved my ingestion rate.

Output from GET /_cat/thread_pool?v&s=t,n&h=type,name,node_name,active,queue,rejected,completed filtered for write and hot

type                  name                                   node_name           active queue rejected completed
fixed                 write                                  alerts-es-hot-5          8    28        0  66365436
fixed                 write                                  alerts-es-hot-10         0     0        0  54379936
fixed                 write                                  alerts-es-hot-6          8     3        0  28685717
fixed                 write                                  alerts-es-hot-2          0     0        0  10887698
fixed                 write                                  alerts-es-hot-4          2     0        0  20565589
fixed                 write                                  alerts-es-hot-0          4     0        0  13213680
fixed                 write                                  alerts-es-hot-9          2     0        0  51567645
fixed                 write                                  alerts-es-hot-3          2     0        0  16574976
fixed                 write                                  alerts-es-hot-8          1     0        0  18415580
fixed                 write                                  alerts-es-hot-11         2     0        0  13439315
fixed                 write                                  alerts-es-hot-1          2     0        0  11308770
fixed                 write                                  alerts-es-hot-7          2     0        0  58737563

So it looks like it has unused indexing capacity.

The CPU tends to run pretty hot, though I didn't see much improvement when I added 50% more hot tier nodes as an experiment (which would be a bit expensive, so I'd need to move away from i3 nodes if more cores is my only option)

GET _cat/nodes/?v=true&s=cpu:desc

ip            heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
172.16.19.193           35         100  99   11.11   12.94    11.13 hir       -      alerts-es-hot-4
172.16.47.205           38         100  99   10.54   12.36    14.22 hir       -      alerts-es-hot-6
172.16.12.17            40         100  97   17.09   14.78    13.11 hir       -      alerts-es-hot-5
172.16.54.28            48         100  90    6.81    6.24     7.09 hir       -      alerts-es-hot-0
172.16.52.47            42          79   9    2.12    1.58     1.09 r         -      alerts-es-coord2-1
172.16.30.111           48         100   7    0.01    0.25     0.68 crs       -      alerts-es-cold-14
172.16.61.88            13         100   7    0.13    0.27     0.34 crs       -      alerts-es-cold-6
172.16.53.135           16         100   7    0.04    0.17     0.46 crs       -      alerts-es-cold-1
172.16.45.238           54          99  68    4.61    3.57     3.87 hir       -      alerts-es-hot-3
172.16.22.239           15         100   5    0.32    0.30     0.30 crs       -      alerts-es-cold-8
172.16.31.127            7         100   5    0.06    0.17     0.61 rsw       -      alerts-es-warm-3
172.16.1.62             41         100   5    0.30    0.80     0.74 crs       -      alerts-es-cold-2
172.16.0.94             39         100   5    0.08    0.18     0.26 crs       -      alerts-es-cold-0
172.16.53.19            39         100   5    0.01    0.09     0.21 crs       -      alerts-es-cold-15
172.16.10.255           17          98   5    0.29    0.33     0.75 rsw       -      alerts-es-warm-0
172.16.31.35            22         100  48    4.32    4.16     4.27 hir       -      alerts-es-hot-10
172.16.21.255           24         100  46    5.78    4.90     5.17 hir       -      alerts-es-hot-2
172.16.59.171           21         100   4    0.05    0.27     0.57 crs       -      alerts-es-cold-11
172.16.21.17            61         100  39    4.18    3.94     4.01 hir       -      alerts-es-hot-1
172.16.17.165           19         100  36    4.23    4.67     4.69 hir       -      alerts-es-hot-9
172.16.3.211            30          97  33    3.28    4.85     5.14 hir       -      alerts-es-hot-7
172.16.54.142           25          97   3    0.27    0.72     1.18 rsw       -      alerts-es-warm-2
172.16.43.235           56          90   3    0.18    0.19     0.56 rsw       -      alerts-es-warm-4
172.16.45.112           52          99  22    2.82    2.76     2.76 hir       -      alerts-es-hot-11
172.16.28.192           93         100  20    1.95    1.84     2.18 rsw       -      alerts-es-warm-1
172.16.42.124           22         100   2    0.10    0.17     0.26 crs       -      alerts-es-cold-16
172.16.8.254            53         100   2    0.13    0.14     0.22 crs       -      alerts-es-cold-5
172.16.33.245           35         100   2    0.03    0.36     0.72 crs       -      alerts-es-cold-21
172.16.48.154           39         100   2    0.14    0.18     0.46 crs       -      alerts-es-cold-3
172.16.43.132           17         100   2    0.08    0.14     0.21 crs       -      alerts-es-cold-9
172.16.56.150           11         100   2    0.12    0.24     0.35 crs       -      alerts-es-cold-19
172.16.12.77            36         100   2    1.32    1.05     0.67 crs       -      alerts-es-cold-7
172.16.16.123           17         100   2    0.51    0.45     0.42 crs       -      alerts-es-cold-4
172.16.23.27            12         100   2    0.15    0.25     0.50 crs       -      alerts-es-cold-12
172.16.6.123            62         100   2    1.10    1.06     0.94 crs       -      alerts-es-cold-20
172.16.6.12             61         100   2    0.43    0.37     0.64 crs       -      alerts-es-cold-17
172.16.1.26             55         100   2    0.09    0.64     0.62 crs       -      alerts-es-cold-18
172.16.54.75            59         100   2    0.08    0.13     0.27 crs       -      alerts-es-cold-13
172.16.52.29            22         100   2    0.13    0.34     0.49 crs       -      alerts-es-cold-10
172.16.13.18            46          83  17    0.29    0.66     0.85 mr        *      alerts-es-leader3-1
172.16.4.49             56         100 100   12.10   11.28    11.63 hir       -      alerts-es-hot-8
172.16.4.38             54          79   1    2.63    2.29     2.01 r         -      alerts-es-coord2-0
172.16.49.244            5          82   1    0.09    0.14     0.15 mr        -      alerts-es-leader3-0
172.16.35.93            46          82   0    0.62    0.79     0.82 mr        -      alerts-es-leader3-2

hot-5 is the highest load
GET _nodes/alerts-es-hot-5/hot_threads

::: {alerts-es-hot-5}{PcNk8skgQwyN6dc2YZRoOA}{PPqENqJ5QW2Wppxvs-KYgg}{172.16.12.17}{172.16.12.17:9300}{hir}{k8s_node_name=ip-172-16-53-249.us-east-2.compute.internal, xpack.installed=true, data=hot, transform.node=false}
   Hot threads at 2023-05-16T17:50:05.517Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:
   
   100.0% [cpu=41.2%, other=58.8%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[alerts-es-hot-5][write][T#1]'
     2/10 snapshots sharing following 26 elements
       app//org.apache.lucene.index.DefaultIndexingChain.processDocument(DefaultIndexingChain.java:491)
       app//org.apache.lucene.index.DocumentsWriterPerThread.updateDocuments(DocumentsWriterPerThread.java:208)
       app//org.apache.lucene.index.DocumentsWriter.updateDocuments(DocumentsWriter.java:415)
       app//org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1471)
       app//org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1757)
       app//org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1400)
       app//org.elasticsearch.index.engine.InternalEngine.addDocs(InternalEngine.java:1312)
       app//org.elasticsearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1248)
       app//org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:1051)
       app//org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:1066)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:998)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnReplica(IndexShard.java:923)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOpOnReplica(TransportShardBulkAction.java:592)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOnReplica(TransportShardBulkAction.java:564)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.lambda$dispatchedShardOperationOnReplica$4(TransportShardBulkAction.java:526)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction$$Lambda$7197/0x00000008020b2f60.get(Unknown Source)
       app//org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:436)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:525)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:74)
       app//org.elasticsearch.action.support.replication.TransportWriteAction$2.doRun(TransportWriteAction.java:224)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:777)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
       java.base@20.0.1/java.lang.Thread.runWith(Thread.java:1636)
       java.base@20.0.1/java.lang.Thread.run(Thread.java:1623)
     2/10 snapshots sharing following 47 elements
       java.base@20.0.1/sun.nio.ch.UnixFileDispatcherImpl.write0(Native Method)
       java.base@20.0.1/sun.nio.ch.UnixFileDispatcherImpl.write(UnixFileDispatcherImpl.java:65)
       java.base@20.0.1/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:137)
       java.base@20.0.1/sun.nio.ch.IOUtil.write(IOUtil.java:102)
       java.base@20.0.1/sun.nio.ch.IOUtil.write(IOUtil.java:72)
       java.base@20.0.1/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:300)
       java.base@20.0.1/sun.nio.ch.ChannelOutputStream.writeFully(ChannelOutputStream.java:68)
       java.base@20.0.1/sun.nio.ch.ChannelOutputStream.write(ChannelOutputStream.java:105)
       app//org.apache.lucene.store.FSDirectory$FSIndexOutput$1.write(FSDirectory.java:416)
       java.base@20.0.1/java.util.zip.CheckedOutputStream.write(CheckedOutputStream.java:73)
       java.base@20.0.1/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:125)
       java.base@20.0.1/java.io.BufferedOutputStream.implWrite(BufferedOutputStream.java:222)
       java.base@20.0.1/java.io.BufferedOutputStream.write(BufferedOutputStream.java:200)
       app//org.apache.lucene.store.OutputStreamIndexOutput.writeBytes(OutputStreamIndexOutput.java:53)
       app//org.elasticsearch.common.lucene.store.FilterIndexOutput.writeBytes(FilterIndexOutput.java:48)
       app//org.apache.lucene.store.ByteBuffersDataOutput.copyTo(ByteBuffersDataOutput.java:287)
       app//org.apache.lucene.codecs.lucene87.LZ4WithPresetDictCompressionMode$LZ4WithPresetDictCompressor.compress(LZ4WithPresetDictCompressionMode.java:189)
       app//org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.flush(CompressingStoredFieldsWriter.java:260)
       app//org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.finishDocument(CompressingStoredFieldsWriter.java:172)
       app//org.apache.lucene.index.StoredFieldsConsumer.finishDocument(StoredFieldsConsumer.java:68)
       app//org.apache.lucene.index.DefaultIndexingChain.finishStoredFields(DefaultIndexingChain.java:463)
       app//org.apache.lucene.index.DefaultIndexingChain.processDocument(DefaultIndexingChain.java:499)
       app//org.apache.lucene.index.DocumentsWriterPerThread.updateDocuments(DocumentsWriterPerThread.java:208)
       app//org.apache.lucene.index.DocumentsWriter.updateDocuments(DocumentsWriter.java:415)
       app//org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1471)
       app//org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1757)
       app//org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1400)
       app//org.elasticsearch.index.engine.InternalEngine.addDocs(InternalEngine.java:1312)
       app//org.elasticsearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1248)
       app//org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:1051)
       app//org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:1066)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:998)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnReplica(IndexShard.java:923)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOpOnReplica(TransportShardBulkAction.java:592)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOnReplica(TransportShardBulkAction.java:564)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.lambda$dispatchedShardOperationOnReplica$4(TransportShardBulkAction.java:526)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction$$Lambda$7197/0x00000008020b2f60.get(Unknown Source)
       app//org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:436)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:525)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:74)
       app//org.elasticsearch.action.support.replication.TransportWriteAction$2.doRun(TransportWriteAction.java:224)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:777)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
       java.base@20.0.1/java.lang.Thread.runWith(Thread.java:1636)
       java.base@20.0.1/java.lang.Thread.run(Thread.java:1623)
     6/10 snapshots sharing following 27 elements
       java.base@20.0.1/sun.nio.ch.UnixFileDispatcherImpl.force0(Native Method)
       java.base@20.0.1/sun.nio.ch.UnixFileDispatcherImpl.force(UnixFileDispatcherImpl.java:85)
       java.base@20.0.1/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:525)
       app//org.elasticsearch.index.translog.TranslogWriter.syncUpTo(TranslogWriter.java:481)
       app//org.elasticsearch.index.translog.Translog.ensureSynced(Translog.java:844)
       app//org.elasticsearch.index.translog.Translog.ensureSynced(Translog.java:865)
       app//org.elasticsearch.index.engine.InternalEngine.ensureTranslogSynced(InternalEngine.java:569)
       app//org.elasticsearch.index.shard.IndexShard$7.write(IndexShard.java:3743)
       app//org.elasticsearch.common.util.concurrent.AsyncIOProcessor.processList(AsyncIOProcessor.java:97)
       app//org.elasticsearch.common.util.concurrent.AsyncIOProcessor.drainAndProcessAndRelease(AsyncIOProcessor.java:85)
       app//org.elasticsearch.common.util.concurrent.AsyncIOProcessor.put(AsyncIOProcessor.java:73)
       app//org.elasticsearch.index.shard.IndexShard.sync(IndexShard.java:3766)
       app//org.elasticsearch.action.support.replication.TransportWriteAction$AsyncAfterWriteAction.run(TransportWriteAction.java:455)
       app//org.elasticsearch.action.support.replication.TransportWriteAction$WriteReplicaResult.runPostReplicaActions(TransportWriteAction.java:336)
       app//org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.lambda$onResponse$2(TransportReplicationAction.java:669)
       app//org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction$$Lambda$7164/0x00000008020ab840.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:136)
       app//org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:447)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:525)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:74)
       app//org.elasticsearch.action.support.replication.TransportWriteAction$2.doRun(TransportWriteAction.java:224)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:777)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
       java.base@20.0.1/java.lang.Thread.runWith(Thread.java:1636)
       java.base@20.0.1/java.lang.Thread.run(Thread.java:1623)
   
   100.0% [cpu=38.6%, other=61.4%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[alerts-es-hot-5][[alerts-account442774118064-traffic-000006][0]: Lucene Merge Thread #184]'
     10/10 snapshots sharing following 12 elements
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter$TermsWriter.write(BlockTreeTermsWriter.java:907)
       app//org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter.write(BlockTreeTermsWriter.java:318)
       app//org.apache.lucene.codecs.FieldsConsumer.merge(FieldsConsumer.java:105)
       app//org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:197)
       app//org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:244)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:139)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4757)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4361)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:5920)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:626)
       app//org.elasticsearch.index.engine.ElasticsearchConcurrentMergeScheduler.doMerge(ElasticsearchConcurrentMergeScheduler.java:94)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:684)
   
   100.0% [cpu=37.3%, other=62.7%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[alerts-es-hot-5][write][T#6]'
     2/10 snapshots sharing following 21 elements
       app//org.elasticsearch.index.mapper.DocumentParser.parseObjectOrNested(DocumentParser.java:477)
       app//org.elasticsearch.index.mapper.DocumentParser.internalParseDocument(DocumentParser.java:145)
       app//org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:91)
       app//org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:83)
       app//org.elasticsearch.index.shard.IndexShard.prepareIndex(IndexShard.java:1025)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:971)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnReplica(IndexShard.java:923)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOpOnReplica(TransportShardBulkAction.java:592)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOnReplica(TransportShardBulkAction.java:564)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.lambda$dispatchedShardOperationOnReplica$4(TransportShardBulkAction.java:526)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction$$Lambda$7197/0x00000008020b2f60.get(Unknown Source)
       app//org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:436)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:525)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:74)
       app//org.elasticsearch.action.support.replication.TransportWriteAction$2.doRun(TransportWriteAction.java:224)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:777)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
       java.base@20.0.1/java.lang.Thread.runWith(Thread.java:1636)
       java.base@20.0.1/java.lang.Thread.run(Thread.java:1623)
     5/10 snapshots sharing following 47 elements
       java.base@20.0.1/sun.nio.ch.UnixFileDispatcherImpl.write0(Native Method)
       java.base@20.0.1/sun.nio.ch.UnixFileDispatcherImpl.write(UnixFileDispatcherImpl.java:65)
       java.base@20.0.1/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:137)
       java.base@20.0.1/sun.nio.ch.IOUtil.write(IOUtil.java:102)
       java.base@20.0.1/sun.nio.ch.IOUtil.write(IOUtil.java:72)
       java.base@20.0.1/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:300)
       java.base@20.0.1/sun.nio.ch.ChannelOutputStream.writeFully(ChannelOutputStream.java:68)
       java.base@20.0.1/sun.nio.ch.ChannelOutputStream.write(ChannelOutputStream.java:105)
       app//org.apache.lucene.store.FSDirectory$FSIndexOutput$1.write(FSDirectory.java:416)
       java.base@20.0.1/java.util.zip.CheckedOutputStream.write(CheckedOutputStream.java:73)
       java.base@20.0.1/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:125)
       java.base@20.0.1/java.io.BufferedOutputStream.implWrite(BufferedOutputStream.java:222)
       java.base@20.0.1/java.io.BufferedOutputStream.write(BufferedOutputStream.java:200)
       app//org.apache.lucene.store.OutputStreamIndexOutput.writeBytes(OutputStreamIndexOutput.java:53)
       app//org.elasticsearch.common.lucene.store.FilterIndexOutput.writeBytes(FilterIndexOutput.java:48)
       app//org.apache.lucene.store.ByteBuffersDataOutput.copyTo(ByteBuffersDataOutput.java:287)
       app//org.apache.lucene.codecs.lucene87.LZ4WithPresetDictCompressionMode$LZ4WithPresetDictCompressor.compress(LZ4WithPresetDictCompressionMode.java:189)
       app//org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.flush(CompressingStoredFieldsWriter.java:260)
       app//org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.finishDocument(CompressingStoredFieldsWriter.java:172)
       app//org.apache.lucene.index.StoredFieldsConsumer.finishDocument(StoredFieldsConsumer.java:68)
       app//org.apache.lucene.index.DefaultIndexingChain.finishStoredFields(DefaultIndexingChain.java:463)
       app//org.apache.lucene.index.DefaultIndexingChain.processDocument(DefaultIndexingChain.java:499)
       app//org.apache.lucene.index.DocumentsWriterPerThread.updateDocuments(DocumentsWriterPerThread.java:208)
       app//org.apache.lucene.index.DocumentsWriter.updateDocuments(DocumentsWriter.java:415)
       app//org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1471)
       app//org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1757)
       app//org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1400)
       app//org.elasticsearch.index.engine.InternalEngine.addDocs(InternalEngine.java:1312)
       app//org.elasticsearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1248)
       app//org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:1051)
       app//org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:1066)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:998)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnReplica(IndexShard.java:923)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOpOnReplica(TransportShardBulkAction.java:592)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOnReplica(TransportShardBulkAction.java:564)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.lambda$dispatchedShardOperationOnReplica$4(TransportShardBulkAction.java:526)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction$$Lambda$7197/0x00000008020b2f60.get(Unknown Source)
       app//org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:436)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:525)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:74)
       app//org.elasticsearch.action.support.replication.TransportWriteAction$2.doRun(TransportWriteAction.java:224)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:777)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
       java.base@20.0.1/java.lang.Thread.runWith(Thread.java:1636)
       java.base@20.0.1/java.lang.Thread.run(Thread.java:1623)
     2/10 snapshots sharing following 29 elements
       app//org.apache.lucene.analysis.FilteringTokenFilter.incrementToken(FilteringTokenFilter.java:49)
       app//org.apache.lucene.index.DefaultIndexingChain$PerField.invert(DefaultIndexingChain.java:924)
       app//org.apache.lucene.index.DefaultIndexingChain.processField(DefaultIndexingChain.java:527)
       app//org.apache.lucene.index.DefaultIndexingChain.processDocument(DefaultIndexingChain.java:491)
       app//org.apache.lucene.index.DocumentsWriterPerThread.updateDocuments(DocumentsWriterPerThread.java:208)
       app//org.apache.lucene.index.DocumentsWriter.updateDocuments(DocumentsWriter.java:415)
       app//org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1471)
       app//org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1757)
       app//org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1400)
       app//org.elasticsearch.index.engine.InternalEngine.addDocs(InternalEngine.java:1312)
       app//org.elasticsearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1248)
       app//org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:1051)
       app//org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:1066)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:998)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnReplica(IndexShard.java:923)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOpOnReplica(TransportShardBulkAction.java:592)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOnReplica(TransportShardBulkAction.java:564)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.lambda$dispatchedShardOperationOnReplica$4(TransportShardBulkAction.java:526)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction$$Lambda$7197/0x00000008020b2f60.get(Unknown Source)
       app//org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:436)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:525)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:74)
       app//org.elasticsearch.action.support.replication.TransportWriteAction$2.doRun(TransportWriteAction.java:224)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:777)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
       java.base@20.0.1/java.lang.Thread.runWith(Thread.java:1636)
       java.base@20.0.1/java.lang.Thread.run(Thread.java:1623)
     unique snapshot
       app//org.apache.lucene.index.TermsHashPerField.writeByte(TermsHashPerField.java:212)
       app//org.apache.lucene.index.TermsHashPerField.writeVInt(TermsHashPerField.java:233)
       app//org.apache.lucene.index.FreqProxTermsWriterPerField.addTerm(FreqProxTermsWriterPerField.java:154)
       app//org.apache.lucene.index.TermsHashPerField.positionStreamSlice(TermsHashPerField.java:200)
       app//org.apache.lucene.index.TermsHashPerField.add(TermsHashPerField.java:188)
       app//org.apache.lucene.index.DefaultIndexingChain$PerField.invert(DefaultIndexingChain.java:974)
       app//org.apache.lucene.index.DefaultIndexingChain.processField(DefaultIndexingChain.java:527)
       app//org.apache.lucene.index.DefaultIndexingChain.processDocument(DefaultIndexingChain.java:491)
       app//org.apache.lucene.index.DocumentsWriterPerThread.updateDocuments(DocumentsWriterPerThread.java:208)
       app//org.apache.lucene.index.DocumentsWriter.updateDocuments(DocumentsWriter.java:415)
       app//org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1471)
       app//org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1757)
       app//org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1400)
       app//org.elasticsearch.index.engine.InternalEngine.addDocs(InternalEngine.java:1312)
       app//org.elasticsearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1248)
       app//org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:1051)
       app//org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:1066)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:998)
       app//org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnReplica(IndexShard.java:923)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOpOnReplica(TransportShardBulkAction.java:592)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.performOnReplica(TransportShardBulkAction.java:564)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.lambda$dispatchedShardOperationOnReplica$4(TransportShardBulkAction.java:526)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction$$Lambda$7197/0x00000008020b2f60.get(Unknown Source)
       app//org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:436)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:525)
       app//org.elasticsearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnReplica(TransportShardBulkAction.java:74)
       app//org.elasticsearch.action.support.replication.TransportWriteAction$2.doRun(TransportWriteAction.java:224)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:777)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
       java.base@20.0.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
       java.base@20.0.1/java.lang.Thread.runWith(Thread.java:1636)
       java.base@20.0.1/java.lang.Thread.run(Thread.java:1623)

Zero window ack stats on hot-5:

    TCPFromZeroWindowAdv: 48083
    TCPToZeroWindowAdv: 48083
    TCPWantZeroWindowAdv: 63227

The rest of the output from netstat -s on hot-5


Ip:
    Forwarding: 1
    1313302656 total packets received
    0 forwarded
    0 incoming packets discarded
    1313302656 incoming packets delivered
    1190592119 requests sent out
Icmp:
    160 ICMP messages received
    16 input ICMP message failed
    ICMP input histogram:
        destination unreachable: 160
    0 ICMP messages sent
    0 ICMP messages failed
    ICMP output histogram:
IcmpMsg:
        InType3: 160
Tcp:
    215125 active connection openings
    227902 passive connection openings
    158 failed connection attempts
    34 connection resets received
    1048 connections established
    1313300078 segments received
    4664710260 segments sent out
    267640 segments retransmitted
    0 bad segments received
    3597 resets sent
Udp:
    2418 packets received
    0 packets to unknown port received
    0 packet receive errors
    2418 packets sent
    0 receive buffer errors
    0 send buffer errors
UdpLite:
TcpExt:
    144 ICMP packets dropped because they were out-of-window
    212990 TCP sockets finished time wait in fast timer
    154 time wait sockets recycled by time stamp
    1224 packetes rejected in established connections because of timestamp
    4455443 delayed acks sent
    22130 delayed acks further delayed because of locked socket
    Quick ack mode was activated 103578 times
    374020130 packet headers predicted
    522114537 acknowledgments not containing data payload received
    180567608 predicted acknowledgments
    TCPSackRecovery: 13556
    Detected reordering 476066 times using SACK
    Detected reordering 1910 times using time stamp
    338 congestion windows fully recovered without slow start
    691 congestion windows partially recovered using Hoe heuristic
    TCPDSACKUndo: 6657
    33 congestion windows recovered without slow start after partial ack
    TCPLostRetransmit: 564
    164471 fast retransmits
    41 retransmits in slow start
    TCPTimeouts: 70
    TCPLossProbes: 178847
    TCPLossProbeRecovery: 135
    TCPSackRecoveryFail: 18
    TCPBacklogCoalesce: 43845032
    TCPDSACKOldSent: 103581
    TCPDSACKRecv: 223315
    323 connections reset due to unexpected data
    934 connections reset due to early user close
    18 connections aborted due to timeout
    TCPDSACKIgnoredOld: 15384
    TCPDSACKIgnoredNoUndo: 73595
    TCPSpuriousRTOs: 2
    TCPSackShifted: 58991
    TCPSackMerged: 123263
    TCPSackShiftFallback: 227303
    TCPRcvCoalesce: 109748436
    TCPOFOQueue: 294200
    TCPSpuriousRtxHostQueues: 11080
    TCPAutoCorking: 1832432
    TCPFromZeroWindowAdv: 48083
    TCPToZeroWindowAdv: 48083
    TCPWantZeroWindowAdv: 63227
    TCPSynRetrans: 11
    TCPOrigDataSent: 4378671662
    TCPHystartTrainDetect: 2037
    TCPHystartTrainCwnd: 67367
    TCPACKSkippedPAWS: 754
    TCPACKSkippedSeq: 32
    TCPWinProbe: 547
    TCPKeepAlive: 2428111
    TCPDelivered: 4379107893
    TCPAckCompressed: 19108
IpExt:
    InOctets: 16329672415841
    OutOctets: 28366374129326
    InNoECTPkts: 2731316941

And ifconfig -a on that node

eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 9001
        inet 172.16.12.17  netmask 255.255.255.255  broadcast 0.0.0.0
        ether 6e:79:c0:8f:66:65  txqueuelen 0  (Ethernet)
        RX packets 1311011174  bytes 16366093179460 (16.3 TB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 1188175544  bytes 28399300774981 (28.3 TB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

I recall running a benchmark a very long time ago against a set of i2.2xlarge instances. I did index log data into a single index and varied the number of primary shards between runs. As far as I remember I reached peak throughput already at 2 shards per node and after that throughput did decrease as the number of indices and shards written to increased. I only went to 10 or so, so nowhere near your 440 indices. This was on a much older version though, so I am sure there has been improvements. I would suspect that writing to fewer indices or at least having each bulk request target a minimal number of indices and shards would improve indexing throughput.

Slight clarification - each bulk request will only write to one index. I collect batches of 30s to 400 records at the source, which are sent to Kinesis, then picked up and written to elasticsearch.

I'm trying to keep the data for each tenant in separate indices so the customer can choose different retention periods, and also to improve the performance when querying the data for a single tenant since that's our most frequent search context. I could experiment with writing to a single index if that's my only option, or splitting into groups (e.g. mod X to stick them in buckets), but I'd strongly prefer to keep them separate if possible.

It looks like you have 4 or 5 nodes with a much higher cpu usage and load than the rest of the nodes. Do these hold a larger pirtion of the hotter indices? Are these maybe receiving more traffic?

Yeah, I've experimented with moving the hottest shards to the nodes with the lowest load, but it's a bit fiddly and only lasts until the rollover occurs and new indices are created. I could always write a job to manage this, but that feels like a dirty solution.

If I was on 8 then I'd have cluster.routing.allocation.balance.write_load as an option for tuning shard allocation based on write load, but sadly I can't go to 8 yet.

I'll experiment with moving the hottest shards around again now that I have more metrics and see what the impact is on my max indexing rate.

Are there several shards from the same hot index on these nodes or just shards from different hot indices? If it is the former you can use the index.routing.allocation.total_shards_per_node index setting to limit the number of shards from a specific index that can reside on a single node. This counts both primaries and replicas you need to set it at a level so that all of these can be allocated. You may also need to remove this setting when you move indices to the warm tier.

Yes indeed these nodes are not working very hard on indexing at all. That indicates to me that the bottleneck is earlier in the process than indexing, so adding nodes or mucking around with shard allocation likely won't do anything. Which nodes are handling the HTTP requests? Are you using any ingest pipelines?

(edit: unless there's just a hotspot on node 5, but even then I'd expect a much larger queue to have built up there)

Everything is running within k8s, and the clients are connecting using a Service named alerts-es-ingest, which selects the hot nodes using elasticsearch.k8s.elastic.co/node-data_hot: "true". That service has a ClusterIP - would a headless service be better in this case?

No ingest pipeline within elasticsearch - I have a separate pipeline that predates our use of elasticsearch.

In which case this strongly suggests that the bottleneck is outside Elasticsearch.

I switched the service to headless, but haven't seen a change in ingest rate.

EC2 metrics for the hot tier for the past 2h:

warm:

cold:

One of the r5a.2xlarge workers that has a client pod reporting slow writes (30m period, new pod deployed when moving to headless service and adding metrics).
It reported 1000 batches that took over 3s to write to elasticsearch, e.g. 5s to write 1k documents with total size of 2.2mb, 3s for 400 records with total size of 1.1mb

Revisiting the zero window ack stats from yesterday, hot-5 had:

 TCPFromZeroWindowAdv: 48083
    TCPToZeroWindowAdv: 48083
    TCPWantZeroWindowAdv: 63227

Today:

    TCPFromZeroWindowAdv: 48260
    TCPToZeroWindowAdv: 48260
    TCPWantZeroWindowAdv: 63480

So the numbers are increasing, but slowly. For buffers filling I generally assume that the process that's reading the data is backlogged and is forced to drop some packets when the buffer fills, which would point to elasticsearch not being able to read the requests fast enough. What am I missing?

This is latency, but your earlier concerns were about throughput, and these things are fairly orthogonal. Figuring out occasional latency spikes can be tricky, and typically needs you to collect diagnostic info just before/during/just after the issue.

I was editing the comment above to add today's zero-window ack stats, but you responded before I was done.

My early troubleshooting attempts were based on what seemed sensible to me, but I've only hit dead ends so far, so my focus has shifted to any "that's funny" logs/metrics. Latency when adding doesn't concern me much, except when the average latency grows since it reduces my throughput for that thread.

Depends whether it's a send buffer or a receive buffer which is filling up. It's Elasticsearch's job to consume the receive buffer, so if you see one of them overflowing then that's an ES problem, but if it's just the send buffer which is full then it's the network not delivering data fast enough.

These network stats indicate that the receive buffer was full on less than 200 occasions over the course of a day, which sounds fine to me.

True, but I would expect there to be enough other client threads for this not to matter too much. Is that not the case?

Partially. The ingestion pipeline uses Kinesis as the queue before writing to Elasticsearch. Kinesis scales horizontally by adding new shards, each with a designated hash range for the partition key. Those shards are automatically assigned to workers by the client library, and processing of a single shard is always single-threaded. This means that delays in writing one batch of records to elasticsearch will cause slower processing for that shard, limiting the pipeline throughput.

I could have some batching after reading from Kinesis, but this would require more checkpoints and de-duplication checks if a job restarted. The current design is simple and attractive since I read a compressed batch of 400-1000 objects for a single tenant from Kinesis, bulk index them, then update the checkpoint in DynamoDB. Adding parallel processing after reading from Kinesis would add complexity and new failure modes, so I've been focusing on the current bottleneck when writing to Elasticsearch. There are other options, e.g. oversharding kinesis, but I'll avoid those tangents.

I captured metrics around each part of the ingestion pipeline and the write to elasticsearch dominates the processing time, with an average of 48s per minute writing to elasticsearch, and the next most expensive operation taking an average of 4s per minute.

This sounds very low given that you have 12 powerful hot nodes. I am assuming you are sending requests directly to the hot nodes, and in that case it sounds like you at any point only have six connections in place. Latency for large bulk requests can grow, so whenever I have benchmarked clusters with respect to throughput I have typically had at least this many connections per hot node.