I see...  I can reproduce this with:
    @Test
    void bulkIngesterFlush() throws IOException {
        final var size = 100_000;
        try (final BulkIngester<Void> ingester = BulkIngester.of(b -> b
                .client(client)
                .globalSettings(gs -> gs
                        .index(indexName)
                )
                .maxOperations(10_000)
                .flushInterval(5, TimeUnit.SECONDS)
        )) {
            final var data = BinaryData.of("{\"foo\":\"bar\"}".getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON);
            for (int i = 0; i < size; i++) {
                ingester.add(bo -> bo.index(io -> io.document(data)));
            }
            ingester.flush();
            client.indices().refresh(rr -> rr.index(indexName));
            final SearchResponse<Void> response = client.search(sr -> sr.index(indexName).trackTotalHits(tth -> tth.enabled(true)), Void.class);
            assertNotNull(response.hits().total());
            assertEquals(size, response.hits().total().value());
        }
    }
Indeed, it seems that calling flush() is just asynchronous and does not wait for the operations to be available/visible.
flush() just empties the list of pending bulk requests.
I can see in the code that when closing, we explicitly wait for the operations to be executed:
But all that does not seem to be visible from outside the class.
@ltrotta WDYT of adding an option to the flush() method like flush(Boolean... wait)?
And in that case, add:
    private final FnCondition flushedCondition = new FnCondition(lock, this::flushed);
    private boolean flushed() {
        return operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0;
    }
    private boolean closedAndFlushed() {
        return isClosed && flushed();
    }
And then change flush a bit to allow waiting for the flush to be completed?
@mattar the way I'm solving this today in integration tests is by "waiting for x documents to be available". Something like:
    public static long awaitBusy(LongSupplier breakSupplier, Long expected, long maxWaitTime, TimeUnit unit) throws InterruptedException {
        long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit);
        long timeInMillis = 1;
        long sum = 0;
        while (sum + timeInMillis < maxTimeInMillis) {
            long current = breakSupplier.getAsLong();
            logger.trace("Check if {} is equal to {}", current, expected);
            if (expected == null && current >= 1) {
                return current;
            } else if (expected != null && current == expected) {
                return expected;
            }
            logger.trace("Sleep for {} because {} is not equal to {}", timeInMillis, current, expected);
            Thread.sleep(timeInMillis);
            sum += timeInMillis;
            timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2);
        }
        timeInMillis = maxTimeInMillis - sum;
        Thread.sleep(Math.max(timeInMillis, 0));
        return breakSupplier.getAsLong();
    }
// Later in tests
        awaitBusy(() -> {
            long totalHits;
            // Let's search for entries
            try {
                // Make sure we refresh indexed docs before counting
                refresh(request.getIndex());
                // Search for the number of docs in the index
                response[0] = documentService.search(request);
            } catch (RuntimeException | IOException e) {
                logger.warn("error caught", e);
                return -1;
            } catch (ElasticsearchClientException e) {
                logger.debug("error caught", e);
                return -1;
            }
            totalHits = response[0].getTotalHits();
            logger.debug("got so far [{}] hits on expected [{}]", totalHits, expected);
            return totalHits;
        }, expected, timeout.millis(), TimeUnit.MILLISECONDS);
This way, I'm not closing the bulk ingester until the application closes.