Hi people, we're developing a plugin in Java with some custom restActions and we are experiencing some cluster node hangs/unresponsiveness after certain endpoints are run. The common denominator seem to be a bit where threads are spawned and some searches are executed in parallel using a shared org.elasticsearch.client.Client.
Our cluster consists of 3 nodes, 2 masters and one data node. Only the node handling the request seem to be affected, but both master nodes are prone to the issue.
- ES version: 2.3.1
- JvM version: 1.8.0_40
- OS version: 2.6.32-358.23.2.el6.x86_64 (Linux)
Noted: when the node goes unresponsive some functionality still remains untouched, for example one of our endpoints that only performs a simpler get
still works without issues while another that performs an unthreaded search
does not. Some functions also stops working, like /_cat
(at least _cat/thread_pool
, _cat/plugins
) while others still mostly work (_node/node_name/_all
, _cluster/health
).
The node does not seem to recover by itself, restart of the node brings it back until error is reproduced.
ES contains around 5m documents (connected to a MongoDB with mongo_connector).
Tests without the threading seem to yield no issues.
Checking jstack on the unresponsive node gives a Thread.State: WAITING (parking) that is waiting forever from the actionGet()
and setting a time out there gave some other issue (can't remember atm).
I have been unable to reproduce the issue in my local test environment running a single node (one shard, no replicas) on win7, but can reproduce on our cluster with the rest endpoint example below.
Usually requiring a couple of refreshes (1-5) before hanging.
Searching on the issue have yielded little fruit (no OOM etc), so any input is welcome.
Test REST Endpoint:
package com.xxx.es.test.rest;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
public class TestAction extends BaseRestHandler {
final static String EVENTS_INDEX = "allevents";
final static String TYPE = "documents";
private String eventIndices;
@Inject
public TestAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
this.eventIndices = settings.get("indexes.events", EVENTS_INDEX);
controller.registerHandler(GET, "/_test/{id}", this);
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
final String id1 = request.param("id");
GetRequest getRequest = new GetRequest(this.eventIndices.trim().split("//s*,//s*")[0], TYPE, id1);
client.get(getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse response) {
Thread[] threads = new Thread[2];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
SearchResponse srsp = client.prepareSearch(
eventIndices.trim().split("//s*,//s*"))
.setTypes(TYPE)
.setSize(0)
.execute().actionGet();
}
});
threads[i].run();
}
channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Done."));
}
@Override
public void onFailure(Throwable e) {}
});
}
}