Using BulkProcessor and rethrowing exception from the afterBulk() method
should not be recommended at all. Currently, doing so results in closing
down the transport and resulting in NoNodeAvailableException's
subsequently. We learnt it the hard way, and the workaround/fix right
now is to NOT throw exceptions.
It would be good if there can be a best practices guide for
BulkProcessor, at the least, it should be mentioned in javadoc to avoid
throwing exceptions from the listener itself.
I've filed an issue that demonstrates the problem with a unit test here:
An example BulkProcessor.Listener would be:
private static class BulkProcessorProblematicListener implements
BulkProcessor.Listener { @Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) {
throw new RuntimeException("Failure in response - " +
response.buildFailureMessage());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
// throwing exception here makes the transport close
resulting in NoNodeAvailableException subsequently
throw new RuntimeException("Caught exception in bulk: " +
request + ", failure: " + failure, failure);
}
}
I do not think there is a problem since the methods are not declared with
any exceptions that should be thrown.
Here are some best practices:
check if your bulk feed is driven by interval or by doc volume and adjust
the BulkProcessor setup accordingly. The default is 5mb (doc volume) and
the flush interval is 5 seconds.
check each BulkResponse in the BulkProcessor.Listener for failures.
Reindex the corresponding document. If you want to stop bulk indexing, set
a volatile boolean error variable that can be checked before a new bulk is
submitted.
after adding the last bulk request, wait for at least 5 seconds before
closing the BulkProcessor, so a flush is triggered for the last bulk.
do not use more than one thread to add bulk requests, since the
underlying list of requests is not threadsafe.
Documenting it up would definitely help - something that results in
getting the transport closed is pretty bad.
Also, it would be better if elasticsearch can internally take care of
mistakes that user can make and make it more robust by not closing down
the transport when they throw exceptions unknowingly. I'd argue that as
a bug unless its clearly mentioned in the javadoc.
Also, some confusion wrt the points u mentioned below inlined.
I do not think there is a problem since the methods are not declared
with any exceptions that should be thrown.
Here are some best practices:
check if your bulk feed is driven by interval or by doc volume and
adjust the BulkProcessor setup accordingly. The default is 5mb (doc
volume) and the flush interval is 5 seconds.
check each BulkResponse in the BulkProcessor.Listener for failures.
Reindex the corresponding document. If you want to stop bulk indexing,
set a volatile boolean error variable that can be checked before a new
bulk is submitted.
after adding the last bulk request, wait for at least 5 seconds
before closing the BulkProcessor, so a flush is triggered for the last
bulk.
Correct me if I'm wrong, but this looks totally unnecessary. From
javadoc of close():
"Closes the processor. If flushing by time is enabled, then its
shutdown. Any remaining bulk actions are flushed."
Also looking at the code, it does seem like the thread calling close()
would flush any remaining items.
do not use more than one thread to add bulk requests, since the
underlying list of requests is not threadsafe.
This came as a surprise. This is in total contrast of what
'concurrentRequests' in BulkProcessor.Builder suggests.
All the add(), execute() and close() methods in bulkProcessor also seem
to be synchronized - don't understand how underlying bulkRequest not
being thread safe matters. Are you sure this is recommended?
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.