Reduce Bulk rejections

Hi, I am performing load testing on a 150 data node elasticsearch cluster (i3.2x AWS instances)
so the write thread pool size is 8 and queue size is 200.

I am indexing into ES using 20 ecs tasks and each ecs task is using 6 threads to ingest into es using bulk processor with concurrent bulk request #6 .

However, I see bulk rejections from many of the data nodes. and when i captured thread dump from one of the nodes, I see below stack trace for one of the threads in the write pool.

java.lang.Thread.State: BLOCKED (on object monitor)
at org.elasticsearch.index.translog.TranslogWriter.add(TranslogWriter.java:171)
- waiting to lock <0x000000060393e258> (a org.elasticsearch.index.translog.TranslogWriter)
at org.elasticsearch.index.translog.Translog.add(Translog.java:544)
at org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:957)
at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:815)
at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:787)
at org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnPrimary(IndexShard.java:744)
at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:267)
at org.elasticsearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:157)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:202)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:114)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:81)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:895)
at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:109)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.runWithPrimaryShardReference(TransportReplicationAction.java:374)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.lambda$doRun$0(TransportReplicationAction.java:297)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction$$Lambda$2961/0x00000008017aec40.accept(Unknown Source)
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
at org.elasticsearch.index.shard.IndexShard.lambda$wrapPrimaryOperationPermitListener$24(IndexShard.java:2802)
at org.elasticsearch.index.shard.IndexShard$$Lambda$2963/0x00000008017afc40.accept(Unknown Source)
at org.elasticsearch.action.ActionListener$3.onResponse(ActionListener.java:113)
at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:285)
at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:237)
at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationPermit(IndexShard.java:2776)
at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryOperationPermit(TransportReplicationAction.java:836)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:293)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.action.support.replication.TransportReplicationAction.handlePrimaryRequest(TransportReplicationAction.java:256)
at org.elasticsearch.action.support.replication.TransportReplicationAction$$Lambda$2201/0x000000080143b040.messageReceived(Unknown Source)
at com.amazon.opendistro.elasticsearch.performanceanalyzer.transport.PerformanceAnalyzerTransportRequestHandler.messageReceived(PerformanceAnalyzerTransportRequestHandler.java:48)
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:63)
at org.elasticsearch.transport.TransportService$8.doRun(TransportService.java:801)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:695)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@14.0.1/ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@14.0.1/ThreadPoolExecutor.java:630)
at java.lang.Thread.run(java.base@14.0.1/Thread.java:832)
Locked ownable synchronizers:
- <0x0000000083347370> (a java.util.concurrent.ThreadPoolExecutor$Worker)
- <0x0000000507f987d0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

looks like one of the write threads has got the lock on TranslogWriter object and other threads are waiting to aquire lock on the same object.

Is there anything that I can do to improve the throughput and reduce bulk rejections?
Does having dedicated ingest nodes help?

Below are the settings that i use
150 data nodes
150 primary shards with 1 replica
index.translog.durability: async
index.translog.flush_threshold_size : 1024MB
elasticsearch version 7.8

Is your data immutable or do you need to update it? How large are the documents on average? How large bulk requests are you using?

@Christian_Dahlqvist ,
Data is not immutable. There are updates as well.
Documents are around 1 kb
bulk actions = 5000
bulk size=1GB
6 concurrent bulk requests.

I think the first thing to try is to upgrade: newer versions have a much more discerning backpressure mechanisms for bulks.

2 Likes

Why such a large cluster?
These days, with things like CCS, we recommend running multiple smaller clusters.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.