ESRejectedExecutionException using _msearch

Hi everyone, I am very new to elasticsearch. I looked around for an answer to my question, but couldn't find one in the documentation or in a similar post here.

In my system, I currently have 2 nodes, one is a combined master and data node, and the other is a client node, with settings like:

Master node:

http.enabled: true
node.data: true
node.master: true
cluster.name: mvtest

Client node:

http.enabled: true
node.data: false
node.master: false
cluster.name: mvtest

All of the other settings are ES 1.5.2 defaults. When I try to post to the _msearch endpoint, using a file like:

{}
{“query”: {“multi_match”: {“operator”:”and”, “query”: “1”, “fields”: [“_id”]}}}
{}
{“query”: {“multi_match”: {“operator”:”and”, “query”: “2”, “fields”: [“_id”]}}}
{}
{“query”: {“multi_match”: {“operator”:”and”, “query”: “3”, “fields”: [“_id”]}}}
…
{}
{“query”: {“multi_match”: {“operator”:”and”, “query”: “999”, “fields”: [“_id”]}}}
{}
{“query”: {“multi_match”: {“operator”:”and”, “query”: “1000”, “fields”: [“_id”]}}}

I get MANY errors in the logs of the form:

[2015-05-20 18:57:22,150][DEBUG][action.search.type       ] [master] [genedoc_mygene_allspecies_current_1][8], node[4ara0M7SReSEPKrZM2g5Gg], [P], s[STARTED]: Failed to execute [org.elasticsearch.action.search.SearchRequest@5104e7e5] lastShard [true]
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1500) on org.elasticsearch.search.action.SearchServiceTransportAction$23@18e20fd8
        at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
        at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:79)
        at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:551)
        at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteQuery(SearchServiceTransportAction.java:228)
        at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.sendExecuteFirstPhase(TransportSearchQueryThenFetchAction.java:83)
        at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:176)
        at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:158)
        at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction.doExecute(TransportSearchQueryThenFetchAction.java:62)
        at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction.doExecute(TransportSearchQueryThenFetchAction.java:52)
        at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:75)
        at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:100)
        at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43)
        at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:75)
        at org.elasticsearch.action.search.TransportMultiSearchAction.doExecute(TransportMultiSearchAction.java:62)
        at org.elasticsearch.action.search.TransportMultiSearchAction.doExecute(TransportMultiSearchAction.java:39)
        at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:75)
        at org.elasticsearch.client.node.NodeClient.execute(NodeClient.java:98)
at org.elasticsearch.client.FilterClient.execute(FilterClient.java:66)
        at org.elasticsearch.rest.BaseRestHandler$HeadersAndContextCopyClient.execute(BaseRestHandler.java:92)
        at org.elasticsearch.client.support.AbstractClient.multiSearch(AbstractClient.java:364)
        at org.elasticsearch.rest.action.search.RestMultiSearchAction.handleRequest(RestMultiSearchAction.java:66)
        at org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:53)
        at org.elasticsearch.rest.RestController.executeHandler(RestController.java:225)
        at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:170)
        at org.elasticsearch.http.HttpServer.internalDispatchRequest(HttpServer.java:121)
        at org.elasticsearch.http.HttpServer$Dispatcher.dispatchRequest(HttpServer.java:83)
        at org.elasticsearch.http.netty.NettyHttpServerTransport.dispatchRequest(NettyHttpServerTransport.java:329)
        at org.elasticsearch.http.netty.HttpRequestHandler.messageReceived(HttpRequestHandler.java:63)
        at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.http.netty.pipelining.HttpPipeliningHandler.messageReceived(HttpPipeliningHandler.java:60)
        at org.elasticsearch.common.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at org.elasticsearch.common.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:194)
        at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Interestingly, when I use the _msearch endpoint on the master node, the query returns, but these errors fill the logs. When I use the _msearch endpoint on the client node using the same query, it will often (but not always) trigger a timeout, and then fill the logs with these errors.

I assume it must have something to do with the thread pool queue, but after reading the ES documentation, I’m not sure how to fix this. I have tried to unbound the search queue using type: cached, and also tried to change the search queue size, all to no avail. However, when I unbound the queue, the error changes to a TransportService not closed error.

I’m still very new to elasticsearch, so I’m hoping someone out there will have some ideas about what’s happening here.

Thanks so much, and let me know if I forgot any relevant background information.

Hey, the rejection failures means that you have overloaded the search thread pool in execution concurrent search requests.

The search thread pool defaults to ~2 the number of cores you have, and 1000 items in the queue. When there are more, those will be rejected. Each execution is a shard level execution (so a search on 5 shards, on a single node, will require 5 concurrent threads).

This mechanism is important not to change, since it helps creating back pressure when the node is overloaded, instead of overloading it more.

The multi search API simply goes and executes all the search requests concurrently. So if you have 10 search requests there, hitting all indices, it will concurrently hit all indices and shards, which might cause the data nodes to start rejecting requests since they are being overloaded concurrently.

I hope my explanation makes sense. If you need the msearch execution to work, across all the indices and shards you have, you probably need a few more nodes to make sure you have the execution capacity to execute it. Another way is to execute each search in a serial manner not using msearch, which will add to the latency of the execution.