Java application using BulkProcessing hangs if elasticsearch hangs

Using ES 1.7.1 as the server and TransportClient in java application. Java application hangs after some time and becomes unresponsive. We are indexing data using bulk API.

In application there are 256 worker threads.

Just adding the relevant information from the Thread dump.

Out of 256 worker threads, 255 threads are blocked with the following stack dump.

"XNIO-1 task-" #325 prio=5 os_prio=0 tid=0x00007efe6405e800 nid=0x2eea waiting for monitor entry [0x00007effe91d0000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:279)

  • waiting to lock <0x00000006e8579d90> (a org.elasticsearch.action.bulk.BulkProcessor)
    at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264)

All 255 threads are blocked in the synchronized internalAdd()

private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) {
    ensureOpen();
    bulkRequest.add(request, payload);
    executeIfNeeded();
}	

The another worker thread is waiting with the dump.

"XNIO-1 task-218" #324 prio=5 os_prio=0 tid=0x00007efe5c060800 nid=0x2ee9 waiting on condition [0x00007effe92d1000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)

  • parking to wait for <0x00000006e8579e38> (a java.util.concurrent.Semaphore$NonfairSync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
    at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:326)
    at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:299)
    at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:281)
  • locked <0x00000006e8579d90> (a org.elasticsearch.action.bulk.BulkProcessor)
    at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264)

This thread has already acquired the internalAdd(), BulkProcessor lock and is trying to send the request to Elasticsearch. This thread is waiting to acquire semaphore.

The code excerpt from execute().

	try {
            listener.beforeBulk(executionId, bulkRequest);
            semaphore.acquire();
            client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                    }
                }

                @Override
                public void onFailure(Throwable e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                    }
                }
            });
            success = true;
        } catch (InterruptedException e) {
            Thread.interrupted();
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Throwable t) {
            listener.afterBulk(executionId, bulkRequest, t);
        } finally {
             if (!success) {  // if we fail on client.bulk() release the semaphore
                 semaphore.release();
             }
        }

Before sending the bulk request, the thread acquires a permit from semaphore. Once it acquires the semaphore, client.bulk() is called. If any error happens, during client.bulk, the semaphore is released in the finally block.
I assume The client.bulk(req, callback) is an asyncronous call. The thread will just make the client.bulk and go back. The result actually comes in callback from another thread. Once the result comes back, the appropriate listener method is called and the semaphore is released.

What would happen, if the elasticsearch is unresposive? Will the callback be invoked? If the callback is not invoked, the semaphore will not be released and all threads of application will wait on BulkProcessor indefinitely. The application becomes unresponsive.

I would really like the application to continue working, even if the metric data is not send to ES.

Instead of indefinitely waiting to acquire semaphore, would it be possible to try to acquire it within a timeout? So that no threads are blocked indefinitely.

I am really stuck now with this issue. Please help.

Thanks,
Paul

Instead of doing,

semaphore.acquire();

Could do something like,

if(!semaphore.tryAcquire(2, TimeUnit.SECONDS))
{
	listener.afterBulk(executionId, bulkRequest,
			new RuntimeException(
					"Elasticsearch may not be responding."));
	return;
}

It would be great, if the timeout could be configurable.

Also another query is, isn't it OK to release the semaphore, even before invoking the listener method?

Is there any reason, not to use tryAcquire() instead of acquire()?

Any one, Please...

I think you want to fix trouble on the wrong side.

The BulkProcessor code makes some unwritten assumptions.

First assumption is, bulk request are spread over many nodes in the cluster. So if one node enters a state where it can never respond, the bulk can continue (as long as cluster permits).

Second assumption is about semaphore. Taking a semaphore is an absolute barrier. It must not be allowed to let more threads execute bulk requests than configured/allowed, at any price. Therefore, subsequent bulk requests must stall in an overflow condition, since there is something serious happening at cluster side.

BulkProcessor should not be used against single node clusters unless it is pretty clear it can not block (in test environments for example).

In my view, using tryAcquire would be pure cosmetic and does not fix any cause at cluster side. There is always a reason why a cluster does not respond, and that has be fixed. Also timeout settings - often in the logs, timeout values are printed, and users tend to worry about that timeout value. But extending timeouts to overcome massive GC pauses on cluster nodes for example does not fix anything. It delays everything and makes ES nodes worse performance-wise. Often adding a node to increase cluster capacity is the solution.

Jörg Prante,

Thank you for responding. The BulkProcessor is client side class. This class is used to send bulk requests to elasticsearch. Usually in an application, sending metric data is not important as the core responsability of that application. Sending metric data is good to have feature.

Do you mean BulkProcessor can only be used in a clustered environment? If I have a single node installation, should I make index calls for each document?

The problem here is BulkProcessor hangs the thread infinitely. Once a worker thread hangs on the semaphore, all the other worker threads are also hanged (@BulkProcessor.internalAdd() synchronized method). Once the elasticsearch becomes unresponsive, You whole application becomes unresponsive. Even if you restart the elasticsearch server does not help. You have to restart to your application.

One way to overcome this issue is you can make the BulkProcessor.add() asynchronous. Add an ExecutorService in between. Let the worker threads call the executorService to add the metric data to BulkProcessor. This way, the worker threads are never hanged on BulkProcessor.

But this also does not solve the problem. If elasticsearch becomes unresponsive, the semaphore permit is lost and the executorService is indefinitely hanged. Even if we restart the elasticsearch, the executorService thead is still hanged. The only way to make it work is restart the application.

Thank you for replying.

BulkProcessor sends concurrent requests by default. If you have a single cluster node, this node must be capable to process all requests or BulkProcessor will hang, that is by design, because there is no backup node.

You can switch to concurrency level 1 so all requests will be synchronous, but that does not fix any cause.

I never saw the semaphore not counting down after an error.

As I said, if the cluster hangs, you should fix the cluster. and not the BulkProcessor. Can you check the cluster logs for errors? Or even give a recipe how to hang a cluster so the error can be made reproducible?

In my case, the elasticsearch server hanged when using the server with defaults. The Heap Size was just 1 GB and was sending 15 bulk requests each second with 1000 actions each. After a long time (30 hours+) the elasticsearch server hanged. In the console I could see OutOfMemory error.

Since I increased the heap size to 16g, I have not seen this issue.

Another easy way to reproduce the issue would be to use BURP suite in between to intercept the response from elasticsearch server and don't forward it to the application. This can easily reproduce the scenario.

Instead of doing the following in the BulkProcessor.execute()

            semaphore.acquire();
            client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                    }
                }

                @Override
                public void onFailure(Throwable e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                    }
                }
            });

Can we do something like,

            semaphore.acquire();
			ListenableActionFuture<BulkResponse> listenableActionFuture = new BulkRequestBuilder(
					client, bulkRequest).execute();
			listenableActionFuture.addListener(new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                    }
                }

                @Override
                public void onFailure(Throwable e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                    }
                }
            });
			executor.execute(new Runnable() {

				@Override
				public void run() {
					try {
						listenableActionFuture.get(5000,
								TimeUnit.MILLISECONDS);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (ExecutionException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (TimeoutException e) {
						if (listenableActionFuture.cancel(true)) {
							semaphore.release();
							System.out
									.println("Bulk request failed with timeout....");
						}
					}
				}
			});

The timeout is not on semaphore.acquire(). But the timeout is required for client.bulk() call.
In the specified timeout, if the response has not come, cancel, the request and release the semaphore and logging about the cancelled bulk request. This way the application threads will not be blocked, even if elasticsearch server is not responding.

What do you think?

Thanks again for the reply.

PS:- there is not BulkRequestBuilder constructor which takes client and BulkRequest. So I added the following constructor.

public BulkRequestBuilder(Client client, BulkRequest bulkRequest) {
    super(client, bulkRequest);
}

And why does prepareBulk take a BulkRequest for an add() method? Or a list of actions in add() method? Is there an easy way to pass the already existing bulkRequest to a BulkrequestBuilder create by prepareBulk() API?

If there is an error, the semaphore is released. The semaphore is not released only if the callback passed to client.bulk() is never invoked, with success or failure result.

I agree the cluster should be fixed. Sending the metric data is a non functional aspect of an application. Working of your application/API is more important than the metric data itself. I would not mind some metric data getting dropped if the elasticsearch is not responding (Bulk request not succeeding within a timeout). But it should not affect the application functionality itself.

Thanks,
Paul

Apologies for resurrecting this old thread but this is exactly the issue we observed during a host migration with Elastic search 1.7.1

In our situation we erroneously migrated to hosts with larger disk space but lesser memory and ran into OOM issues which subsequently caused many bulk index requests to block forever. Should n't the bulk requests adhere to server operation timeouts as opposed to blocking forever or is my understanding incorrect.

1 Like