Cluster node unresponsive after search

Hi people, we're developing a plugin in Java with some custom restActions and we are experiencing some cluster node hangs/unresponsiveness after certain endpoints are run. The common denominator seem to be a bit where threads are spawned and some searches are executed in parallel using a shared org.elasticsearch.client.Client.

Our cluster consists of 3 nodes, 2 masters and one data node. Only the node handling the request seem to be affected, but both master nodes are prone to the issue.

  • ES version: 2.3.1
  • JvM version: 1.8.0_40
  • OS version: 2.6.32-358.23.2.el6.x86_64 (Linux)

Noted: when the node goes unresponsive some functionality still remains untouched, for example one of our endpoints that only performs a simpler get still works without issues while another that performs an unthreaded search does not. Some functions also stops working, like /_cat (at least _cat/thread_pool, _cat/plugins) while others still mostly work (_node/node_name/_all, _cluster/health).
The node does not seem to recover by itself, restart of the node brings it back until error is reproduced.
ES contains around 5m documents (connected to a MongoDB with mongo_connector).
Tests without the threading seem to yield no issues.
Checking jstack on the unresponsive node gives a Thread.State: WAITING (parking) that is waiting forever from the actionGet() and setting a time out there gave some other issue (can't remember atm).

I have been unable to reproduce the issue in my local test environment running a single node (one shard, no replicas) on win7, but can reproduce on our cluster with the rest endpoint example below.
Usually requiring a couple of refreshes (1-5) before hanging.

Searching on the issue have yielded little fruit (no OOM etc), so any input is welcome.

Test REST Endpoint:

package com.xxx.es.test.rest;

import static org.elasticsearch.rest.RestRequest.Method.GET;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;

public class TestAction extends BaseRestHandler {

    final static String EVENTS_INDEX = "allevents";
    final static String TYPE = "documents";
    private String eventIndices;

    @Inject
    public TestAction(Settings settings, RestController controller, Client client) {
        super(settings, controller, client);
        this.eventIndices = settings.get("indexes.events", EVENTS_INDEX);
        controller.registerHandler(GET, "/_test/{id}", this);
    }

    @Override
    protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
        final String id1 = request.param("id");

        GetRequest getRequest = new GetRequest(this.eventIndices.trim().split("//s*,//s*")[0], TYPE, id1);

        client.get(getRequest, new ActionListener<GetResponse>() {

            @Override
            public void onResponse(GetResponse response) {

                Thread[] threads = new Thread[2];
                for (int i = 0; i < threads.length; i++) {
                    threads[i] = new Thread(new Runnable() {

                        @Override
                        public void run() {
                            SearchResponse srsp = client.prepareSearch(
                                    eventIndices.trim().split("//s*,//s*"))
                                    .setTypes(TYPE)
                                    .setSize(0)
                                    .execute().actionGet();
                        }
                    });
                    threads[i].run();
                }
                channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Done."));
            }

            @Override
            public void onFailure(Throwable e) {}
        });
    }
}

Update after leaving the issue for a few days:
Still no solution, but a couple of things have been noticed:

  • Adding a timeout to the actionGet call only helps by not bringing down the node every time, calls will still fail.

  • The problem can be recreated in single threaded environments without retries with just:

      client.get(getRequest, new ActionListener<GetResponse>() {
          @Override
          public void onResponse(GetResponse response) {
              for (int i = 0; i < 10; i++) {
                  SearchResponse srsp = client.prepareSearch(
                          eventIndices.trim().split("//s*,//s*"))
                          .setTypes(TYPE)
                          .setSize(0)
                          .execute().actionGet();
              }
              channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Done."));
          }
          @Override
          public void onFailure(final Throwable e) {
              try {
                  channel.sendResponse(new BytesRestResponse(channel, e));
              } catch (IOException e1) {
                  logger.error("Failed to send failure response", e1);
              }
          }
      });
    

Maybe worth mentioning that queue size for both search and get operations is 1000 and it doesn't look like it's some kind of overload issue.

If there is something strange we are doing or if any one seen any similar behaviour it would be nice to know.

/O