EsRejectedExecution Exception

Hi,
I am using java client to ingest data continuously from kafka to elasticsearch and i am using elasticsearch 2.2 version.
I am running four threads which continuously submit bulk ingestion tasks with a bulk load of 5000 transactions.
Before each thread submits an ingestion task it will check the current size of bulk thead pool and if its size is less than 40 only then the thread will submit the task.
Following is the code I am using to check queue

public boolean checkQueue() {
final NodesStatsResponse response = client.admin().cluster().prepareNodesStats().setThreadPool(true).execute()
.actionGet();
final NodeStats[] nodeStats2 = response.getNodes();
for (NodeStats nodeStats3 : nodeStats2) {
ThreadPoolStats stats = nodeStats3.getThreadPool();
if (stats != null) {
for (ThreadPoolStats.Stats threadPoolStat : stats) {
if (threadPoolStat.getName().equals("bulk")) {
if (threadPoolStat.getQueue() >= 40) {
logger.info("Stuck in checkQueue");
EsConsumerMetricsHandler.es_struckQueue.mark(1);
Thread.yield();
return checkQueue();
}
}
}
}
}
EsConsumerMetricsHandler.es_struckQueue.mark(0);
return true;
}

With this never bulk indexing queue should fill up more than 50 right?. as when its more than 40, our indexing thread will wait until it comes below 40 instead of submitting the task. Even in this scenario, I got the following exception.

2016-07-27 06:54:18 ERROR KafkaMultiConsumer:104 - Failure message failure in bulk execution:
[0]: index [index_14_log_27jul2016], type [80959], id [6311894190618902564], message [RemoteTransportException[[es-strong2.zs2-na4][172.31.19.128:9300][indices:data/write/bulk[s][p]]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@7e40df4d on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@5e1d84f[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 5450089]]];]

I am wondering how could this be possible? Can some one please explain me the scenario at which this can be possible.

Hi @Sai_Birada,

With this never bulk indexing queue should fill up more than 50 right?

No. This can still happen. For the reasons see my answer in the topic Threadpool/ Queue size limitation unsolved. In that response I also suggest concrete mitigation techniques.

With your approach you basically busy wait (Thread.yield() is usually implemented a no-op in the JVM and even if it weren't, you wouldn't wait) until the queue is below 40 items. Apart from repeatedly checking the nodes stats (additional effort for ES), you also have a race as two threads can concurrently observe a queue size of less than 40 items and submit a bulk.

Daniel

Yes daniel,
I ran into the same race condition. Thats the root cause for me seeing this exception. Thanks for your response

1 Like