ElasticSearch 2.2 Bulkload TransportException: TransportService is closed stopped can't send request

We upgraded the elasticsearch cluster from 2.1.1 to 2.2. Our bulk load process through bulkload processor which was working before with 2.1.1 is now throwing the following exception. Please let me know if i am missing something.

[main] INFO org.elasticsearch.plugins - [Masque] modules , plugins , sites
[main] INFO com.zu.bids.rt.ESTest - Going to execute new bulk composed of 100 actions
[elasticsearch[Masque][listener][T#1]] WARN com.zu.bids.rt.ESTest - Error executing bulk
SendRequestTransportException[[Tutinax the Mountain-Mover][elasticsearch-master-01/10.240.0.22:9300][indices:data/write/bulk]]; nested: TransportException[TransportService is closed stopped can't send request];
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:323)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:51)
at org.elasticsearch.client.transport.support.TransportProxyClient$1.doWithNode(TransportProxyClient.java:58)
at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:247)
at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:588)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: TransportException[TransportService is closed stopped can't send request]
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:303)
... 8 more

Test Code is below:

public class ESTest {

private static final Logger LOG = LoggerFactory.getLogger(ESTest.class);
public static void main(String[] args) throws IOException {
    String[] esnodes = "elasticsearch-master-01:9300,elasticsearch-master-02:9300,elasticsearch-master-03:9300"
            .split(",");
    Settings settings = Settings.builder().put("cluster.name", "zuelasticsearch")
            .put("client.transport.sniff", false).put("client.transport.ping_timeout", 20, TimeUnit.SECONDS)
            .build();
    TransportClient tclient = TransportClient.builder().settings(settings).build();
    for (String node : esnodes) {
        String[] host = node.split(":");
        tclient.addTransportAddress(
                new InetSocketTransportAddress(InetAddress.getByName(host[0]), Integer.valueOf(host[1])));
    }
    Client client = tclient;
    BulkProcessor bulkProcessor = getBulkProcessor(client, 3000, 3);
    String[] strs = { "This", "is", "a", "good", "test" };
    for (int i = 0; i < 100; i++) {
        XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();
        jsonBuilder.field(String.valueOf(i), strs[i % 5]);
        jsonBuilder.endObject();
        bulkProcessor
                .add(client.prepareIndex("remtest", "tello", String.valueOf(i)).setSource(jsonBuilder).request());
    }
    bulkProcessor.close();
    client.close();
}
public static BulkProcessor getBulkProcessor(Client client, int nActions, int nConcurrentRequests) {
    return BulkProcessor.builder(client, new Listener() {
        public void beforeBulk(long executionId, BulkRequest request) {
            LOG.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
        }
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            LOG.info("Executed bulk composed of {} actions", request.numberOfActions());
        }
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            LOG.warn("Error executing bulk", failure);
        }
    }).setBulkActions(nActions).setConcurrentRequests(nConcurrentRequests).build();
}

}

Short hack: add a wait period before closing the client.

The problem here is that closing the bulk is supposed to flush the bulk but it the client is closed at the same time, it fails to perform the bulk.

I'd also add a FlushInterval to the bulk processor but it's not related to your issue.

1 Like

Add an explicit awaitClose call to wait for all outstanding bulk requests, e.g.

bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
bulkProcessor.close();
client.close();
1 Like

w00t. I never realized it was documented at https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.2/java-docs-bulk-processor.html.

Thanks Jorg.

1 Like

@dadoonet @jprante Thanks for the answers. Your solution is working fine most of the times.. but it is still failing few times. Is there any better calculation for the wait time. ??