The latch in BulkRequestHandler is added before to ensure that the new handler executes a bulk request and awaits for the completion if the BulkProcessor was configured with a concurrentRequests setting of 0. The related issue is Simplify BulkProcessor handling and retry logic.
However, the only code that currently calls into this code, which is blocked by synchronized
methods, is removed by this change.
The latch is now unnecessary in this handler. Shall we consider removing this logic and simplify the execute method?
public void execute(BulkRequest bulkRequest, long executionId) {
......
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, ActionListener.runAfter(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
listener.afterBulk(executionId, bulkRequest, response);
}
@Override
public void onFailure(Exception e) {
listener.afterBulk(executionId, bulkRequest, e);
}
}, () -> {
semaphore.release();
latch.countDown();
}));
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
......
}