We upgraded the elasticsearch cluster from 2.1.1 to 2.2. Our bulk load process through bulkload processor which was working before with 2.1.1 is now throwing the following exception. Please let me know if i am missing something.
[main] INFO org.elasticsearch.plugins - [Masque] modules , plugins , sites
[main] INFO com.zu.bids.rt.ESTest - Going to execute new bulk composed of 100 actions
[elasticsearch[Masque][listener][T#1]] WARN com.zu.bids.rt.ESTest - Error executing bulk
SendRequestTransportException[[Tutinax the Mountain-Mover][elasticsearch-master-01/10.240.0.22:9300][indices:data/write/bulk]]; nested: TransportException[TransportService is closed stopped can't send request];
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:323)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:51)
at org.elasticsearch.client.transport.support.TransportProxyClient$1.doWithNode(TransportProxyClient.java:58)
at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:247)
at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:588)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: TransportException[TransportService is closed stopped can't send request]
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:303)
... 8 more
Test Code is below:
public class ESTest {
private static final Logger LOG = LoggerFactory.getLogger(ESTest.class);
public static void main(String[] args) throws IOException {
String[] esnodes = "elasticsearch-master-01:9300,elasticsearch-master-02:9300,elasticsearch-master-03:9300" .split(","); Settings settings = Settings.builder().put("cluster.name", "zuelasticsearch") .put("client.transport.sniff", false).put("client.transport.ping_timeout", 20, TimeUnit.SECONDS) .build();
TransportClient tclient = TransportClient.builder().settings(settings).build(); for (String node : esnodes) { String[] host = node.split(":"); tclient.addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName(host[0]), Integer.valueOf(host[1]))); }
Client client = tclient;
BulkProcessor bulkProcessor = getBulkProcessor(client, 3000, 3);
String[] strs = { "This", "is", "a", "good", "test" };
for (int i = 0; i < 100; i++) {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject(); jsonBuilder.field(String.valueOf(i), strs[i % 5]); jsonBuilder.endObject(); bulkProcessor .add(client.prepareIndex("remtest", "tello", String.valueOf(i)).setSource(jsonBuilder).request()); }
bulkProcessor.close(); client.close();
}
public static BulkProcessor getBulkProcessor(Client client, int nActions, int nConcurrentRequests) { return BulkProcessor.builder(client, new Listener() { public void beforeBulk(long executionId, BulkRequest request) { LOG.info("Going to execute new bulk composed of {} actions", request.numberOfActions()); }
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { LOG.info("Executed bulk composed of {} actions", request.numberOfActions()); }
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.warn("Error executing bulk", failure); }
}).setBulkActions(nActions).setConcurrentRequests(nConcurrentRequests).build(); }
}