Getting errors in the ES Java Clinet when upserting the index

Hello,

We have something like 53K upsert (row update) requests going into an index. Seeing this exception.

java.util.concurrent.ExecutionException: RemoteTransportException[[prod525][xx.xx.xx.xx:9300][indices:data/write/update]];
nested: RemoteTransportException[[prod523][xx.xx.xx.xx:9300][indices:data/write/update[s]]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler@26aec5a4 on EsThreadPoolExecutor[index,
queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@4f467769[Running, pool size = 32, active threads = 32, queued tasks = 200, completed tasks = 2797278]]]; at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:290)
~[elasticsearch-2.1.1.jar:2.1.1] at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:277) ~[elasticsearch-2.1.1.jar:2.1.1] at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:116) ~[elasticsearch-2.1.1.jar:2.1.1]
at com.yp.argo.etl.writer.db.PostETL.lambda$write$6(PostETL.java:241) ~[classes/:na] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[na:1.8.0_60] at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1540) ~[na:1.8.0_60]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[na:1.8.0_60] at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) ~[na:1.8.0_60] at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[na:1.8.0_60]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[na:1.8.0_60] at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401) ~[na:1.8.0_60] at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) ~[na:1.8.0_60] at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
~[na:1.8.0_60] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[na:1.8.0_60] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[na:1.8.0_60] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
~[na:1.8.0_60] at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) ~[na:1.8.0_60] at com.yp.argo.etl.writer.db.PostETL.write(PostETL.java:238) ~[classes/:na] at com.yp.argo.etl.parser.models.ETLObject.pushETL(ETLObject.java:183)
~[classes/:na] at com.yp.argo.etl.parser.models.ETLObject.documentProcess(ETLObject.java:188) ~[classes/:na] at com.yp.argo.etl.parser.listener.HtmlETLConsumerListener.process(HtmlETLConsumerListener.java:41) ~[classes/:na] at com.yp.argo.etl.parser.listener.MessageWorker.run(MessageWorker.java:52)
~[classes/:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_60] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_60] at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_60]
Caused by: org.elasticsearch.transport.RemoteTransportException: [prod525][10.1.14.225:9300][indices:data/write/update] Caused by: org.elasticsearch.transport.RemoteTransportException: [prod523][10.1.14.200:9300][indices:data/write/update[s]] Caused by: org.elasticsearch.common.util.concurrent.EsRejectedExecutionException:
rejected execution of org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler@26aec5a4 on EsThreadPoolExecutor[index, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@4f467769[Running, pool size = 32, active
threads = 32, queued tasks = 200, completed tasks = 2797278]] at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:50) ~[elasticsearch-2.1.1.jar:2.1.1] at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
~[na:1.8.0_60] at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[na:1.8.0_60] at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:85) ~[elasticsearch-2.1.1.jar:2.1.1] at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:246)
~[elasticsearch-2.1.1.jar:2.1.1] at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:114) ~[elasticsearch-2.1.1.jar:2.1.1] at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
~[netty-3.10.5.Final.jar:na] at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.5.Final.jar:na] at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
~[netty-3.10.5.Final.jar:na] at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[netty-3.10.5.Final.jar:na] at

Looks like you're trying to index each item individually and hitting a queue limit (200). That generally means you're attempting to index faster than ES can keep up. I'd suggest using bulk api indexing instead as it likely is more efficient.

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

If you're not able to use that for whatever reason you can increase the thread pool queue in ES (check the docs) but that is likely just postponing the issue.

What is the optimal threshold for thread pool queue in ES. I read that it can affect other things like search, merges, etc.

That depends on your needs, your server and your config. IOW, it's pretty hard to say. The defaults are fairly sane though - bump up the queue and you could run into memory issues if your $ES_HEAP_SIZE isn't sized appropriately.

Though, for large #'s of inserts do use the bulk api - that's what it's there for.

Increasing queue sizes is not an effective solution to saturated queues. An execution queue becomes stuffed if and only if Elasticsearch can not keep up with the rate of arrival. If Elasticsearch can not keep up with the rate of arrivals, increasing the queue size does not address that problem: the new queue will also fill up, it will just take longer. You either have to throttle your rate of sending requests to Elasticsearch (which is what a rejected execution exception is trying to get you to do, it's a form of backpressure), add more resources to your cluster (whether it be better hardware than you currently have, or more hardware than you currently have), or use the bulk API to combine multiple requests into one.