Using ES 1.7.1 as the server and TransportClient in java application. Java application hangs after some time and becomes unresponsive. We are indexing data using bulk API.
In application there are 256 worker threads.
Just adding the relevant information from the Thread dump.
Out of 256 worker threads, 255 threads are blocked with the following stack dump.
"XNIO-1 task-" #325 prio=5 os_prio=0 tid=0x00007efe6405e800 nid=0x2eea waiting for monitor entry [0x00007effe91d0000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:279)
- waiting to lock <0x00000006e8579d90> (a org.elasticsearch.action.bulk.BulkProcessor)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264)
All 255 threads are blocked in the synchronized internalAdd()
private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) {
ensureOpen();
bulkRequest.add(request, payload);
executeIfNeeded();
}
The another worker thread is waiting with the dump.
"XNIO-1 task-218" #324 prio=5 os_prio=0 tid=0x00007efe5c060800 nid=0x2ee9 waiting on condition [0x00007effe92d1000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006e8579e38> (a java.util.concurrent.Semaphore$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:326)
at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:299)
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:281)- locked <0x00000006e8579d90> (a org.elasticsearch.action.bulk.BulkProcessor)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264)
This thread has already acquired the internalAdd(), BulkProcessor lock and is trying to send the request to Elasticsearch. This thread is waiting to acquire semaphore.
The code excerpt from execute().
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
}
}
@Override
public void onFailure(Throwable e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
}
}
});
success = true;
} catch (InterruptedException e) {
Thread.interrupted();
listener.afterBulk(executionId, bulkRequest, e);
} catch (Throwable t) {
listener.afterBulk(executionId, bulkRequest, t);
} finally {
if (!success) { // if we fail on client.bulk() release the semaphore
semaphore.release();
}
}
Before sending the bulk request, the thread acquires a permit from semaphore. Once it acquires the semaphore, client.bulk() is called. If any error happens, during client.bulk, the semaphore is released in the finally block.
I assume The client.bulk(req, callback) is an asyncronous call. The thread will just make the client.bulk and go back. The result actually comes in callback from another thread. Once the result comes back, the appropriate listener method is called and the semaphore is released.
What would happen, if the elasticsearch is unresposive? Will the callback be invoked? If the callback is not invoked, the semaphore will not be released and all threads of application will wait on BulkProcessor indefinitely. The application becomes unresponsive.
I would really like the application to continue working, even if the metric data is not send to ES.
Instead of indefinitely waiting to acquire semaphore, would it be possible to try to acquire it within a timeout? So that no threads are blocked indefinitely.
I am really stuck now with this issue. Please help.
Thanks,
Paul