ref url in github:
env:
CentOS7, Java 8, SpringBoot, Maven, ES highlevel jar
Elasticsearch 6.7 which was running in docker on OSS service
background:
this is a data handler node which need to do some scheduled data sync jobs, which i will load some data from mysql and do some data handlings then bulk insert into target index in ES
issue:
I was build a single singleton es client with the application starting
try {
String template = "%s:%s";
HttpHost[] httpHosts = Arrays.stream(ES_ADDRESSES.split(",")).parallel().map(x -> {
esHttpAddress.add(String.format(template, x, ES_HTTP_PORT));
return new HttpHost(x, ES_HTTP_PORT, "http");
}).toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(httpHosts)
.setRequestConfigCallback((RequestConfig.Builder requestConfigBuilder) ->
requestConfigBuilder.setConnectTimeout(ES_CONNECT_TIMEOUT)
.setSocketTimeout(ES_SOCKET_TIMEOUT)
.setConnectionRequestTimeout(ES_CONNECTION_REQUEST_TIMEOUT))
.setMaxRetryTimeoutMillis(ES_MAX_RETRY_TINEOUT_MILLIS);
restHighLevelClient = new RestHighLevelClient(builder);
bulkProcessor = BulkProcessor.builder((request, bulkListener) ->
restHighLevelClient.bulkAsync(request, COMMON_OPTIONS, bulkListener),
getBPListener())
.setBulkActions(ES_BULK_FLUSH)
.setBulkSize(new ByteSizeValue(ES_BULK_SIZE, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(10L))
.setConcurrentRequests(ES_BULK_CONCURRENT)
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
.build();
} catch (Exception e) {
e.printStackTrace();
String errMsg = "Error happened when we init ES transport client" + e;
log.error(errMsg);
throw new MainServiceException(ResultEnum.ES_CLIENT_INIT);
}
and then, every data saving operations will be done by
bulkProcessor.add((DocWriteRequest) request);
but when i start the data sync jobs, java.lang.IllegalStateException
happens which announced that the exception happened for I/O STOPPED
does it means that the bulkProcessor or restHighLevelClient has an up-to limit on requests, and it will close the connection when the requests' number is too much here?