I see... I can reproduce this with:
@Test
void bulkIngesterFlush() throws IOException {
final var size = 100_000;
try (final BulkIngester<Void> ingester = BulkIngester.of(b -> b
.client(client)
.globalSettings(gs -> gs
.index(indexName)
)
.maxOperations(10_000)
.flushInterval(5, TimeUnit.SECONDS)
)) {
final var data = BinaryData.of("{\"foo\":\"bar\"}".getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON);
for (int i = 0; i < size; i++) {
ingester.add(bo -> bo.index(io -> io.document(data)));
}
ingester.flush();
client.indices().refresh(rr -> rr.index(indexName));
final SearchResponse<Void> response = client.search(sr -> sr.index(indexName).trackTotalHits(tth -> tth.enabled(true)), Void.class);
assertNotNull(response.hits().total());
assertEquals(size, response.hits().total().value());
}
}
Indeed, it seems that calling flush()
is just asynchronous and does not wait for the operations to be available/visible.
flush()
just empties the list of pending bulk requests.
I can see in the code that when closing, we explicitly wait for the operations to be executed:
But all that does not seem to be visible from outside the class.
@ltrotta WDYT of adding an option to the flush()
method like flush(Boolean... wait)
?
And in that case, add:
private final FnCondition flushedCondition = new FnCondition(lock, this::flushed);
private boolean flushed() {
return operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0;
}
private boolean closedAndFlushed() {
return isClosed && flushed();
}
And then change flush a bit to allow waiting for the flush to be completed?
@mattar the way I'm solving this today in integration tests is by "waiting for x documents to be available". Something like:
public static long awaitBusy(LongSupplier breakSupplier, Long expected, long maxWaitTime, TimeUnit unit) throws InterruptedException {
long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit);
long timeInMillis = 1;
long sum = 0;
while (sum + timeInMillis < maxTimeInMillis) {
long current = breakSupplier.getAsLong();
logger.trace("Check if {} is equal to {}", current, expected);
if (expected == null && current >= 1) {
return current;
} else if (expected != null && current == expected) {
return expected;
}
logger.trace("Sleep for {} because {} is not equal to {}", timeInMillis, current, expected);
Thread.sleep(timeInMillis);
sum += timeInMillis;
timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2);
}
timeInMillis = maxTimeInMillis - sum;
Thread.sleep(Math.max(timeInMillis, 0));
return breakSupplier.getAsLong();
}
// Later in tests
awaitBusy(() -> {
long totalHits;
// Let's search for entries
try {
// Make sure we refresh indexed docs before counting
refresh(request.getIndex());
// Search for the number of docs in the index
response[0] = documentService.search(request);
} catch (RuntimeException | IOException e) {
logger.warn("error caught", e);
return -1;
} catch (ElasticsearchClientException e) {
logger.debug("error caught", e);
return -1;
}
totalHits = response[0].getTotalHits();
logger.debug("got so far [{}] hits on expected [{}]", totalHits, expected);
return totalHits;
}, expected, timeout.millis(), TimeUnit.MILLISECONDS);
This way, I'm not closing the bulk ingester until the application closes.