Bulkprocess timeout

HI,

I have one eod job to dump over 240000 objects to ES, but I got this erro message, any idea?
ERROR [11:27:07.777] [I/O dispatcher 1] b.g.d.s.t.e.e.BulkProcessorManager - Failed to execute bulk
java.util.concurrent.TimeoutException: null
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:364) ~[TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:344) ~[TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:318) ~[TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:303) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.releaseConnection(AbstractClientExchangeHandler.java:239) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.MainClientExec.responseCompleted(MainClientExec.java:387) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:168) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) [TradeSyncRS-3.0.0.jar:3.0.0]
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) [TradeSyncRS-3.0.0.jar:3.0.0]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]

code>>>>>
public class BulkProcessorManager {
private static final Logger log = LoggerFactory.getLogger(BulkProcessorManager.class);

private RestHighLevelClient client;
private int bulkAction = 10;
private int concurrentReq = 10;
private int bulkSize = 5;
private int flushInterval = 5;
private int timeout = 5;

private BulkProcessor bulkProcessor = null;

public BulkProcessorManager(RestHighLevelClient client)
{
	this.client = client;
	bulkProcessor = createBulkProcessor();
}

private BulkProcessor createBulkProcessor(){
	BulkProcessor.Listener listener = new BulkProcessor.Listener() {
		public void afterBulk(long executionId, BulkRequest arg1, BulkResponse response) {
			if (response.hasFailures()) {
				log.warn("Bulk [{}] executed with failures", executionId);
			} else {
				log.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
			}
		}

		@Override
		public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
			log.error("Failed to execute bulk", failure);
		}

		@Override
		public void beforeBulk(long executionId, BulkRequest request) {
			int numberOfActions = request.numberOfActions();
			log.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);
		}

	};

	ThreadPool threadPool = new ThreadPool(
			Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "high-level-client").build());
	
	BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
			.setBulkActions(bulkAction).setConcurrentRequests(concurrentReq)
			.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
			.setFlushInterval(TimeValue.timeValueSeconds(flushInterval)).build();

	return bulkProcessor;
}

public void executeBulkRequests(List<UpdateRequest> updateRequestList) throws Exception 
{
	if ( bulkProcessor == null )
		bulkProcessor = createBulkProcessor();
	
	try {			
		for (UpdateRequest updateRequest : updateRequestList) {
			bulkProcessor.add(updateRequest);
		}
	} finally {
		bulkProcessor.awaitClose(timeout, TimeUnit.MINUTES);
		bulkProcessor = null;
	}
}

public void executeBulkDeleteRequests(List<DeleteRequest> deleteRequestList) throws Exception 
{
	bulkProcessor = null;
	try {
		bulkProcessor = createBulkProcessor();
		for (DeleteRequest deleteRequest : deleteRequestList) {
			bulkProcessor.add(deleteRequest);
		}
	} finally {
		bulkProcessor.awaitClose(timeout, TimeUnit.MINUTES);
		bulkProcessor = null;
	}
}

public void addDeleteRequest(DeleteRequest request)
{
	if ( bulkProcessor == null )
		bulkProcessor = createBulkProcessor();
	
	bulkProcessor.add(request);
}

public void addRequest(IndexRequest trade)
{
	if ( bulkProcessor == null )
		bulkProcessor = createBulkProcessor();
	
	bulkProcessor.add(trade);
}

public void close()
{
	if ( bulkProcessor != null )
	{
		try {
			bulkProcessor.awaitClose(timeout, TimeUnit.MINUTES);
		} catch (InterruptedException e) {
			log.error("Failed to close bulk processor.", e);
		}
	}
}

}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.