Using ES 7.5.1 as the server and RestHighLevelClient in java application.
Java application hangs after circuit_breaking_exception and becomes unresponsive.
We are indexing data using bulk API.
Stack trace:
Name: OracleFileDecoder115
State: WAITING on java.util.concurrent.Semaphore$NonfairSync@3aa1448
Total blocked: 4 Total waited: 93
Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:59)
org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:389)
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:361)
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:347)
com.rt.jio.indexer.indexing.product.OracleFileDecoder.readCSVFilesData(OracleFileDecoder.java:470)
com.rt.jio.indexer.indexing.product.OracleFileDecoder.run(OracleFileDecoder.java:175)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
No callbacks received after CBE``
public boolean createBulkConnection() {
boolean processorFlag = false;
try {
ByteSizeValue size = new ByteSizeValue(AtomIndexerConfigParamsEnum.ES_BULK_SIZE.getIntValue(), ByteSizeUnit.KB);
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
try {
if (response.hasFailures()) {
logger.error("********************************************BULK FAILURE******************************************");
} else {
logger.debug(
"******************************************** AFTER BULK SUCCESS******************************************");
}
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage(),e);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
try {
logger.error("********************************************BULK FAILURE THROWALE******************************************");
logger.error("---------------Exception in bulk insertion-----------------------" , failure);
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage(),e);
}
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) -> clientRest.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
builder.setBulkActions(AtomIndexerConfigParamsEnum.ES_Bulk_Action.getIntValue()); // 7500
builder.setBulkSize(size);
builder.setConcurrentRequests(AtomIndexerConfigParamsEnum.ES_Concurrent_Requests.getIntValue());
builder.setFlushInterval(
TimeValue.timeValueSeconds(AtomIndexerConfigParamsEnum.ES_Flush_Interval.getIntValue()));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), AtomIndexerConfigParamsEnum.ES_Max_No_Of_Retries_BackoffPolicy.getIntValue()));
processor = builder.build();
} catch (Exception e) {
// TODO Auto-generated catch block
//e.printStackTrace();
logger.error(e.getMessage(),e);
}
if (processor != null) {
processorFlag = true;
}
return processorFlag;
}
Bulk Request :
if (ESConnection.getInstance()
.getBulkConnection() != null) {
ESConnection.getInstance().getBulkConnection()
.add(new IndexRequest(indexAlias,
"_doc").source(json).id(id));
}
We are stuck after getting Circuit breaking Exception as our entire application stopped working. Kindly help on this and let us know how we can resolve this.
Regards,
Vinay Gayki