Problem
Here is the problem observed : while doing bulk indexing, and after a while, the semaphore is acquired in the internalAdd method of the asynchronous bulk request handler, but is never released.
After some investigations, it appears that an OutOfMemoryError occured while allocating new DirectByteBuffer (memory is never release because we set the flag DisableExplicitGC in our JVM's application).
Here is a partial view of our thread dump :
"elasticsearch[...][bulk_processor][T#1]" - Thread t@63
java.lang.Thread.State: BLOCKED
at org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:342)
- waiting to lock <597aca4b> (a org.elasticsearch.action.bulk.BulkProcessor) owned by "Sync-7" t@71
"Sync-4" - Thread t@77
java.lang.Thread.State: BLOCKED
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:286)
- waiting to lock <597aca4b> (a org.elasticsearch.action.bulk.BulkProcessor) owned by "Sync-7" t@71
"Sync-7" - Thread t@71
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <2d08aa0f> (a java.util.concurrent.Semaphore$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at org.elasticsearch.action.bulk.BulkRequestHandler$AsyncBulkRequestHandler.execute(BulkRequestHandler.java:121)
at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:315)
at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:306)
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
- locked <597aca4b> (a org.elasticsearch.action.bulk.BulkProcessor)
Here is the stacktrace found using JMC :
Stack Trace Count
java.lang.Throwable.(String) 1
java.lang.Error.(String) 1
java.lang.VirtualMachineError.(String) 1
java.lang.OutOfMemoryError.(String) 1
java.nio.Bits.reserveMemory(long, int) 1
java.nio.DirectByteBuffer.(int) 1
java.nio.ByteBuffer.allocateDirect(int) 1
io.netty.buffer.PoolArena$DirectArena.allocateDirect(int) 1
io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(int) 1
io.netty.buffer.PoolArena.allocateHuge(PooledByteBuf, int) 1
io.netty.buffer.PoolArena.allocate(PoolThreadCache, PooledByteBuf, int) 1
io.netty.buffer.PoolArena.allocate(PoolThreadCache, int, int) 1
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(int, int) 1
io.netty.buffer.AbstractByteBufAllocator.directBuffer(int, int) 1
io.netty.buffer.AbstractByteBufAllocator.directBuffer(int) 1
io.netty.channel.nio.AbstractNioChannel.newDirectBuffer(ByteBuf) 1
io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(Object) 1
io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise) 1
io.netty.channel.DefaultChannelPipeline$HeadContext.write(ChannelHandlerContext, Object, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(Object, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(Object, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext.write(Object, boolean, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext.write(Object, ChannelPromise) 1
org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.write(ChannelHandlerContext, Object, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(Object, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(Object, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext, Object, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext, Object, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext, Object, ChannelPromise) 1
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run() 1
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(Runnable) 1
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(long) 1
io.netty.channel.nio.NioEventLoop.run() 1
io.netty.util.concurrent.SingleThreadEventExecutor$5.run() 1
java.lang.Thread.run() 1
More details
We index data using the bulk processor. Here is the configuration of the bulk processor :
- setBulkActions = 30000
- setBulkSize = 10 MB
- setFlushInterval = 10s
- setConcurrentRequests = 1
- setBackoffPolicy = 8 retry, 50ms starting time
Questions
- From my understandings, some messages are never sent to the cluster (at this point, I don't know which one) :
- Why is the onFailure method from the ActionListener is never called ? The backoff policy should have timed out, and triggered it : is that right ?
- From those observations, is there any more errors we should worry about (i.e. where the semaphore will never be released) ? If yes, is there any known workaround for those issues ?
- I did not found much conversation about the flag recommendation besides this thread. Do you recommend to not use this flag ? If yes, this could be an addition to the current documentation (or did I miss it ?).
- Is there any configuration on Elastic side to avoid this issue ?
Workaround (?)
One way to solve this issue is to remove the flag -XX:+DisableExplicitGC, thus allowing the system.gc call within the calls java.nio.Bits. By adding -XX:+ExplicitGCInvokesConcurrent, instead of a full gc, only a CMS will be triggerd.
This has not yet been tested. Tests shall be done today. I'll keep you informed.