Identify, save and resend failed requests in BulkProcessor

I am using BulkProcessor to index data to Elasticsearch 5.2. In my requests, there are a couple of failures and I get response.hasFailures() as true. How do I extract the failed requests, save them to a file and resend them in another request?

My BulkProcessor is built like this:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
	@Override
	public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
		System.out.println("Bulk execution completed ["+  executionId + "].\n" +
                "Took (ms): " + response.getTookInMillis() + "\n" +
                "Failures: " + response.hasFailures() + "\n" + 
                "Count: " + response.getItems().length);
	}
	@Override
	public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
		System.out.println("Bulk execution failed [" + executionId + "].\n" + failure.toString());
	}
	@Override
	public void beforeBulk(long executionId, BulkRequest request) {
	}
})
.setBulkActions(-1)
.setBulkSize(new ByteSizeValue(15, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(0)
.build();

You need to maintain a Map of the operations you sent to the bulk.

If successful clean that map. If not, retrieve the operation from the map.

Note that it can consume some space in your HEAP then...

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.