Java application using BulkProcessing hangs When "circuit_breaking_exception" occurs

Using ES 7.5.1 as the server and RestHighLevelClient in java application.
Java application hangs after circuit_breaking_exception and becomes unresponsive.
We are indexing data using bulk API.
Stack trace:

Name: OracleFileDecoder115
State: WAITING on java.util.concurrent.Semaphore$NonfairSync@3aa1448
Total blocked: 4 Total waited: 93



Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:59)
org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:389)
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:361)
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:347)
com.rt.jio.indexer.indexing.product.OracleFileDecoder.readCSVFilesData(OracleFileDecoder.java:470)
com.rt.jio.indexer.indexing.product.OracleFileDecoder.run(OracleFileDecoder.java:175)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

No callbacks received after CBE``

public boolean createBulkConnection() {
boolean processorFlag = false;
try {
ByteSizeValue size = new ByteSizeValue(AtomIndexerConfigParamsEnum.ES_BULK_SIZE.getIntValue(), ByteSizeUnit.KB);
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
try {
if (response.hasFailures()) {
logger.error("********************************************BULK FAILURE******************************************");
} else {
logger.debug(
"******************************************** AFTER BULK SUCCESS******************************************");
}
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage(),e);
}
}



@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
try {
logger.error("********************************************BULK FAILURE THROWALE******************************************");

logger.error("---------------Exception in bulk insertion-----------------------" , failure);



} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage(),e);
}
}
};



BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) -> clientRest.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
builder.setBulkActions(AtomIndexerConfigParamsEnum.ES_Bulk_Action.getIntValue()); // 7500
builder.setBulkSize(size);
builder.setConcurrentRequests(AtomIndexerConfigParamsEnum.ES_Concurrent_Requests.getIntValue());
builder.setFlushInterval(
TimeValue.timeValueSeconds(AtomIndexerConfigParamsEnum.ES_Flush_Interval.getIntValue()));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), AtomIndexerConfigParamsEnum.ES_Max_No_Of_Retries_BackoffPolicy.getIntValue()));



processor = builder.build();



} catch (Exception e) {
// TODO Auto-generated catch block
//e.printStackTrace();
logger.error(e.getMessage(),e);
}
if (processor != null) {
processorFlag = true;
}



return processorFlag;
}

Bulk Request :

if (ESConnection.getInstance()
.getBulkConnection() != null) {

ESConnection.getInstance().getBulkConnection()
.add(new IndexRequest(indexAlias,
"_doc").source(json).id(id));
}

We are stuck after getting Circuit breaking Exception as our entire application stopped working. Kindly help on this and let us know how we can resolve this.

Regards,
Vinay Gayki

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.