Close Channel Exceptions while using transport client with bulkprocessor

I have a Java SE application which is using transport client and high level rest client for indexing data into elasticsearch, I am using a bulk processor to process the requests (index,delete,update) in bulk, like this

 processor.add(client.prepareIndex(index, type).setSource(s).request());

The bulk processor is getting created like

BulkProcessor
    .builder(client, initBulkProcessorListener())
    .setBulkActions(Integer.valueOf(getEsProperty("elasticsearch.bulkrequestsize")))
    .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
    .setFlushInterval(TimeValue.timeValueSeconds(5))
    .setConcurrentRequests(1)
    .setBackoffPolicy(BackoffPolicy.exponentialBackoff())
    .build();

i do a flush of pending requests using the following method

public void flushPendingRequests(BulkProcessor processor) {
 processor.flush();
 try {
  processor.awaitClose(10000, TimeUnit.MILLISECONDS);
 } catch (InterruptedException e) {
  LOGGER.error("Interrupted Exception : " + e.getMessage());
  Thread.currentThread().interrupt();
 }
 processor.close();
 }

later on i am closing the client instance using the following method

  public void close() {
LOGGER.info("Closing transport client");
if (client == null) {
  LOGGER.info("Transport client already closed");
  return;
}

// This is to prevent closed channel exceptions
shutdownThreadPool(client.threadPool().scheduler());
try {
  Thread.sleep(1000);
} catch (InterruptedException e) {
  e.printStackTrace();
}
client.close();

}

private static void shutdownThreadPool(ScheduledExecutorService pool) {
pool.shutdown();
try {
  boolean terminated = pool.awaitTermination(THREADPOOL_TIMEOUT, THREADPOOL_TIMEOUT_UNIT);
  if (!terminated) {
    LOGGER.warn("Thread pool timeout elapsed before termination, wait again for " + THREADPOOL_TIMEOUT + " "
        + THREADPOOL_TIMEOUT_UNIT.toString() + "...");
    pool.shutdownNow();
    if (!pool.awaitTermination(THREADPOOL_TIMEOUT, THREADPOOL_TIMEOUT_UNIT)) {
      LOGGER.error("Thread pool did not terminate");
    }
  }
} catch (InterruptedException e) {
  LOGGER.error("Error occurred while shutting down the thread pool: " + e.getMessage(), e);
  pool.shutdownNow();
  Thread.currentThread().interrupt();
}
}

Even though everything seems to be working fine, i always get closeChannel Exceptions in elasticsearch logs when the application shuts down, it disappears when i add a Thread.sleep(1000); though, i don't seem to understand what the problem is exactly, help is much appreciated.
Cheers!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.