HI,
I have one eod job to dump over 240000 objects to ES, but I got this erro message, any idea?
ERROR [11:27:07.777] [I/O dispatcher 1] b.g.d.s.t.e.e.BulkProcessorManager - Failed to execute bulk
java.util.concurrent.TimeoutException: null
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:364) ~[TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:344) ~[TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:318) ~[TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:303) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.releaseConnection(AbstractClientExchangeHandler.java:239) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.MainClientExec.responseCompleted(MainClientExec.java:387) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:168) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) [TradeSyncRS-3.0.0.jar:3.0.0]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
code>>>>>
public class BulkProcessorManager {
private static final Logger log = LoggerFactory.getLogger(BulkProcessorManager.class);
private RestHighLevelClient client;
private int bulkAction = 10;
private int concurrentReq = 10;
private int bulkSize = 5;
private int flushInterval = 5;
private int timeout = 5;
private BulkProcessor bulkProcessor = null;
public BulkProcessorManager(RestHighLevelClient client)
{
this.client = client;
bulkProcessor = createBulkProcessor();
}
private BulkProcessor createBulkProcessor(){
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
public void afterBulk(long executionId, BulkRequest arg1, BulkResponse response) {
if (response.hasFailures()) {
log.warn("Bulk [{}] executed with failures", executionId);
} else {
log.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("Failed to execute bulk", failure);
}
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
log.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
};
ThreadPool threadPool = new ThreadPool(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "high-level-client").build());
BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
.setBulkActions(bulkAction).setConcurrentRequests(concurrentReq)
.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval)).build();
return bulkProcessor;
}
public void executeBulkRequests(List<UpdateRequest> updateRequestList) throws Exception
{
if ( bulkProcessor == null )
bulkProcessor = createBulkProcessor();
try {
for (UpdateRequest updateRequest : updateRequestList) {
bulkProcessor.add(updateRequest);
}
} finally {
bulkProcessor.awaitClose(timeout, TimeUnit.MINUTES);
bulkProcessor = null;
}
}
public void executeBulkDeleteRequests(List<DeleteRequest> deleteRequestList) throws Exception
{
bulkProcessor = null;
try {
bulkProcessor = createBulkProcessor();
for (DeleteRequest deleteRequest : deleteRequestList) {
bulkProcessor.add(deleteRequest);
}
} finally {
bulkProcessor.awaitClose(timeout, TimeUnit.MINUTES);
bulkProcessor = null;
}
}
public void addDeleteRequest(DeleteRequest request)
{
if ( bulkProcessor == null )
bulkProcessor = createBulkProcessor();
bulkProcessor.add(request);
}
public void addRequest(IndexRequest trade)
{
if ( bulkProcessor == null )
bulkProcessor = createBulkProcessor();
bulkProcessor.add(trade);
}
public void close()
{
if ( bulkProcessor != null )
{
try {
bulkProcessor.awaitClose(timeout, TimeUnit.MINUTES);
} catch (InterruptedException e) {
log.error("Failed to close bulk processor.", e);
}
}
}
}