I am pushing data to elasticsearch using bulk api RestClient. When I am pushing 1Lac records I am not getting any exception. But above 1Lac records I am getting below exception.
An established connection was aborted by the software in your host machine
java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(Unknown Source)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(Unknown Source)
at sun.nio.ch.IOUtil.write(Unknown Source)
at sun.nio.ch.SocketChannelImpl.write(Unknown Source)
at org.apache.http.impl.nio.codecs.AbstractContentEncoder.doWriteChunk(AbstractContentEncoder.java:171)
at org.apache.http.impl.nio.codecs.AbstractContentEncoder.doWriteChunk(AbstractContentEncoder.java:164)
at org.apache.http.impl.nio.codecs.AbstractContentEncoder.writeToChannel(AbstractContentEncoder.java:138)
at org.apache.http.impl.nio.codecs.LengthDelimitedEncoder.write(LengthDelimitedEncoder.java:130)
at org.apache.http.nio.entity.NStringEntity.produceContent(NStringEntity.java:174)
at org.apache.http.nio.protocol.BasicAsyncRequestProducer.produceContent(BasicAsyncRequestProducer.java:125)
at org.apache.http.impl.nio.client.MainClientExec.produceContent(MainClientExec.java:262)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.produceContent(DefaultClientExchangeHandlerImpl.java:136)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.outputReady(HttpAsyncRequestExecutor.java:241)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.produceOutput(DefaultNHttpClientConnection.java:290)
at org.apache.http.impl.nio.client.InternalIODispatch.onOutputReady(InternalIODispatch.java:86)
at org.apache.http.impl.nio.client.InternalIODispatch.onOutputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.outputReady(AbstractIODispatch.java:145)
at org.apache.http.impl.nio.reactor.BaseIOReactor.writable(BaseIOReactor.java:188)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:341)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Unknown Source)
DEBUG [I/O dispatcher 25] RestClient.onFailure(483) | added host [http://172.21.153.176:9200] to blacklist
Below is my source code.
RestClient restClient = RestClient.builder(new HttpHost(elasticsearch_host, 9200, "http")).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(60000*5)
.setSocketTimeout(60000*5);
}
}).setMaxRetryTimeoutMillis(60000*5).build();
String actionMetaData = String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }%n", uploadedFileIndex, "indextype");
JSONObject obj = new JSONObject();
FileInputStream file = null;
Workbook workbook = null;
List<Object> colList = new ArrayList<Object>();
List<String> bulkData = new ArrayList<String>();
StringBuilder bulkRequestBody = new StringBuilder();
file = new FileInputStream(new File(filePath));
workbook = new Workbook(file);
Worksheet worksheet = workbook.getWorksheets().get(0);
Cells cells = worksheet.getCells();
Range range = cells.getMaxDisplayRange();
int totalColumns = range.getColumnCount();
RowCollection rows = cells.getRows();
for(int i = 0; i<=0 ; i++)
{
for (int j = 0; j < totalColumns; j++)
{
colList.add(cells.get(i, j).getValue());
}
}
if(colList.size()==0)
{
returnMap.put("Status", "Column List Empty");
return returnMap;
}
for (int i = 1; i < rows.getCount(); i++)
{
for (int j = 0; j < totalColumns; j++)
{
String key = (String)colList.get(j);
if(null==key || "".equals(key))
key = "Column"+j;
obj.put(key, cells.get(i, j).getValue());
}
obj.put("Action_Date", dateFormat.format(new Date()));
String jsonString = obj.toJSONString();
bulkData.add(jsonString);
obj.clear();
}
for (String bulkItem : bulkData) {
bulkRequestBody.append(actionMetaData);
bulkRequestBody.append(bulkItem);
bulkRequestBody.append("\n");
}
String jsonString = bulkRequestBody.toString();
HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
Response indexResponse = restClient.performRequest(
"PUT",
"/"+uploadedFileIndex+"/indextype/_bulk",
Collections.<String, String>emptyMap(),
entity);
//System.out.println(EntityUtils.toString(indexResponse.getEntity()));
if(indexResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK)
{
returnMap.put("Status", "Index Created");
System.out.println("Index Successfully Created");
return returnMap;
}