Hi,
I am using java client to ingest data continuously from kafka to elasticsearch and i am using elasticsearch 2.2 version.
I am running four threads which continuously submit bulk ingestion tasks with a bulk load of 5000 transactions.
Before each thread submits an ingestion task it will check the current size of bulk thead pool and if its size is less than 40 only then the thread will submit the task.
Following is the code I am using to check queue
public boolean checkQueue() {
final NodesStatsResponse response = client.admin().cluster().prepareNodesStats().setThreadPool(true).execute()
.actionGet();
final NodeStats[] nodeStats2 = response.getNodes();
for (NodeStats nodeStats3 : nodeStats2) {
ThreadPoolStats stats = nodeStats3.getThreadPool();
if (stats != null) {
for (ThreadPoolStats.Stats threadPoolStat : stats) {
if (threadPoolStat.getName().equals("bulk")) {
if (threadPoolStat.getQueue() >= 40) {
logger.info("Stuck in checkQueue");
EsConsumerMetricsHandler.es_struckQueue.mark(1);
Thread.yield();
return checkQueue();
}
}
}
}
}
EsConsumerMetricsHandler.es_struckQueue.mark(0);
return true;
}
With this never bulk indexing queue should fill up more than 50 right?. as when its more than 40, our indexing thread will wait until it comes below 40 instead of submitting the task. Even in this scenario, I got the following exception.
2016-07-27 06:54:18 ERROR KafkaMultiConsumer:104 - Failure message failure in bulk execution:
[0]: index [index_14_log_27jul2016], type [80959], id [6311894190618902564], message [RemoteTransportException[[es-strong2.zs2-na4][172.31.19.128:9300][indices:data/write/bulk[s][p]]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@7e40df4d on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@5e1d84f[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 5450089]]];]
I am wondering how could this be possible? Can some one please explain me the scenario at which this can be possible.