I'm using the BulkProcessor to index documents in elasticsearch. Its
definitely made my indexing throughput greater than it was before.
Anyway, I was wondering if there were some best practices around exception
handling with the bulk processor. For example it would be good to schedule
retries in certain scenarios.
At the moment all I'm doing is logging. I was wondering if someone could
point me to a resource with an example of handling a
NodeNotConnectedException and doing a retry. I don’t know how to access the
contents of the bulkProcessor from within the afterBulk method in the
Listener.
public void beforeBulk(long executionId, BulkRequest
bulkRequest) {
}
@Override
public void afterBulk(long executionId, BulkRequest
bulkRequest, BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
Log.error("We have failures");
for (BulkItemResponse bulkItemResponse :
bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
Log.error(bulkItemResponse.getId() + " failed
with message: " + bulkItemResponse.getFailureMessage());
}
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest
bulkRequest, Throwable t) {
Log.error("An exception occurred while indexing", t);
// How do I add this back to the list of requests?
}
I'm using the BulkProcessor to index documents in elasticsearch. Its definitely made my indexing throughput greater than it was before.
Anyway, I was wondering if there were some best practices around exception handling with the bulk processor. For example it would be good to schedule retries in certain scenarios.
At the moment all I'm doing is logging. I was wondering if someone could point me to a resource with an example of handling a NodeNotConnectedException and doing a retry. I don’t know how to access the contents of the bulkProcessor from within the afterBulk method in the Listener.
public void beforeBulk(long executionId, BulkRequest bulkRequest) {
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
Log.error("We have failures");
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
Log.error(bulkItemResponse.getId() + " failed with message: " + bulkItemResponse.getFailureMessage());
}
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable t) {
Log.error("An exception occurred while indexing", t);
// How do I add this back to the list of requests?
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.