dadoonet
(David Pilato)
October 3, 2023, 11:37am
2
Welcome!
Yes. You can use it the same way you were using the BulkProcessor.
Small usage example here:
void bulkIngester() throws IOException {
int size = 1000;
try {
client.indices().delete(dir -> dir.index("bulk"));
} catch (ElasticsearchException ignored) { }
BulkIngester<Void> ingester = BulkIngester.of(b -> b
.client(client)
.listener(new BulkListener<Void>() {
@Override
public void beforeBulk(long executionId, BulkRequest request, List<Void> voids) {
logger.debug("going to execute bulk of {} requests", request.operations().size());
}
@Override
public void afterBulk(long executionId, BulkRequest request, List<Void> voids, BulkResponse response) {
logger.debug("bulk executed {} errors", response.errors() ? "with" : "without");
}
@Override
public void afterBulk(long executionId, BulkRequest request, List<Void> voids, Throwable failure) {
This file has been truncated. show original
AFAICS in the code, it's using an internal synchronisation so it's safe to use the same instance from multiple threads:
// Synchronization objects
private final ReentrantLock lock = new ReentrantLock();
private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation);
private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest);
private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed);