I have a Java SE application which is using transport client and high level rest client for indexing data into elasticsearch, I am using a bulk processor to process the requests (index,delete,update) in bulk, like this
processor.add(client.prepareIndex(index, type).setSource(s).request());
The bulk processor is getting created like
BulkProcessor
.builder(client, initBulkProcessorListener())
.setBulkActions(Integer.valueOf(getEsProperty("elasticsearch.bulkrequestsize")))
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff())
.build();
i do a flush of pending requests using the following method
public void flushPendingRequests(BulkProcessor processor) {
processor.flush();
try {
processor.awaitClose(10000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.error("Interrupted Exception : " + e.getMessage());
Thread.currentThread().interrupt();
}
processor.close();
}
later on i am closing the client instance using the following method
public void close() {
LOGGER.info("Closing transport client");
if (client == null) {
LOGGER.info("Transport client already closed");
return;
}
// This is to prevent closed channel exceptions
shutdownThreadPool(client.threadPool().scheduler());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
client.close();
}
private static void shutdownThreadPool(ScheduledExecutorService pool) {
pool.shutdown();
try {
boolean terminated = pool.awaitTermination(THREADPOOL_TIMEOUT, THREADPOOL_TIMEOUT_UNIT);
if (!terminated) {
LOGGER.warn("Thread pool timeout elapsed before termination, wait again for " + THREADPOOL_TIMEOUT + " "
+ THREADPOOL_TIMEOUT_UNIT.toString() + "...");
pool.shutdownNow();
if (!pool.awaitTermination(THREADPOOL_TIMEOUT, THREADPOOL_TIMEOUT_UNIT)) {
LOGGER.error("Thread pool did not terminate");
}
}
} catch (InterruptedException e) {
LOGGER.error("Error occurred while shutting down the thread pool: " + e.getMessage(), e);
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
Even though everything seems to be working fine, i always get closeChannel Exceptions in elasticsearch logs when the application shuts down, it disappears when i add a Thread.sleep(1000); though, i don't seem to understand what the problem is exactly, help is much appreciated.
Cheers!