Bulk Ingester flush timing issues with wait_for Refresh Policy

Hello everyone,

I’m working with the bulk ingester in Elastic 8.15.3 and encountering some timing issues with flush. I configured the bulk ingester with a refresh policy set to wait_for in the global settings, expecting that flush would ensure data availability immediately after it's called. However, flush doesn’t seem to wait for indexing completion as anticipated.

In reviewing the code, I noticed that the close method behaves differently: it calls flush and waits for it to finish, whereas flush alone doesn’t seem to block until the operation is fully processed. Could anyone confirm if this behavior is expected or if there’s a way to enforce synchronous waiting with flush?

To mitigate this problem, we're currently relying on closing and creating a new Ingester, but this is adding significant overhead to our production environments.

Thanks in advance!

2 Likes

I'm curious on what you are trying to do here with the Bulk Ingester and why you would like it "to wait for" the flush being done...

The way I'm seeing the Bulk Ingester is "fire and forget".
Like: I want to index a document and it will be eventually visible. Just: I don't know when exactly...

My question is: is this a "production" issue? And if so, could you share a bit of your code so I can understand better what you are trying to do. Are you trying to do something from a listener?
Or is it an Integration Test issue? I suspect the later as I had exactly the same problem to deal with although you are saying that there's an impact in production...

In short:

  • Fire and forget: I'm using the BulkIngester
  • I want to control the freshness of the data: I'm using "manual" Bulk Requests
1 Like

I'm maintaining a low-code platform where users can develop scripts to perform CRUD operations. I need to ensure read consistency within the same script that the user creates. So, if the user writes code like this:

for (let i = 0; i < 10; i++) {
  create({ num: i });
}

let res = search({ query: { ... } });

What I'm currently doing internally is adding each create command to the ingester. Before the search occurs, I perform a flush to the ingester (which is configured with the wait_for option), assuming that the data would be available for search. However, that is not the case. If I close the ingester, though, the search becomes consistent.

That wouldn’t be a problem for this example, but in some test scenarios, it's common for users to perform many single index requests followed by a search. Internally, I'm currently not differentiating between these two cases (maybe we should…).

Just to clarify, we also have an implementation that uses "manual" bulk requests, but the performance gains and convenience of the bulk ingester have been very significant, especially with multithreading, error handling, and everything else—particularly in cases where the user performs a high volume of writes.

In short, what I'm getting from you is that we should find a way to differentiate between these two cases, right? In cases where the user performs a high volume of writes, it would be acceptable for the data not to be immediately available. But if I'm dealing with less data and need to read my write, then I should consider using "manual" requests.

I see... I can reproduce this with:

    @Test
    void bulkIngesterFlush() throws IOException {
        final var size = 100_000;
        try (final BulkIngester<Void> ingester = BulkIngester.of(b -> b
                .client(client)
                .globalSettings(gs -> gs
                        .index(indexName)
                )
                .maxOperations(10_000)
                .flushInterval(5, TimeUnit.SECONDS)
        )) {
            final var data = BinaryData.of("{\"foo\":\"bar\"}".getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON);
            for (int i = 0; i < size; i++) {
                ingester.add(bo -> bo.index(io -> io.document(data)));
            }

            ingester.flush();

            client.indices().refresh(rr -> rr.index(indexName));
            final SearchResponse<Void> response = client.search(sr -> sr.index(indexName).trackTotalHits(tth -> tth.enabled(true)), Void.class);
            assertNotNull(response.hits().total());
            assertEquals(size, response.hits().total().value());
        }
    }

Indeed, it seems that calling flush() is just asynchronous and does not wait for the operations to be available/visible.
flush() just empties the list of pending bulk requests.

I can see in the code that when closing, we explicitly wait for the operations to be executed:

But all that does not seem to be visible from outside the class.

@ltrotta WDYT of adding an option to the flush() method like flush(Boolean... wait)?

And in that case, add:

    private final FnCondition flushedCondition = new FnCondition(lock, this::flushed);

    private boolean flushed() {
        return operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0;
    }

    private boolean closedAndFlushed() {
        return isClosed && flushed();
    }

And then change flush a bit to allow waiting for the flush to be completed?

@mattar the way I'm solving this today in integration tests is by "waiting for x documents to be available". Something like:

    public static long awaitBusy(LongSupplier breakSupplier, Long expected, long maxWaitTime, TimeUnit unit) throws InterruptedException {
        long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit);
        long timeInMillis = 1;
        long sum = 0;

        while (sum + timeInMillis < maxTimeInMillis) {
            long current = breakSupplier.getAsLong();
            logger.trace("Check if {} is equal to {}", current, expected);
            if (expected == null && current >= 1) {
                return current;
            } else if (expected != null && current == expected) {
                return expected;
            }
            logger.trace("Sleep for {} because {} is not equal to {}", timeInMillis, current, expected);
            Thread.sleep(timeInMillis);
            sum += timeInMillis;
            timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2);
        }
        timeInMillis = maxTimeInMillis - sum;
        Thread.sleep(Math.max(timeInMillis, 0));
        return breakSupplier.getAsLong();
    }

// Later in tests

        awaitBusy(() -> {
            long totalHits;

            // Let's search for entries
            try {
                // Make sure we refresh indexed docs before counting
                refresh(request.getIndex());
                // Search for the number of docs in the index
                response[0] = documentService.search(request);
            } catch (RuntimeException | IOException e) {
                logger.warn("error caught", e);
                return -1;
            } catch (ElasticsearchClientException e) {
                logger.debug("error caught", e);
                return -1;
            }
            totalHits = response[0].getTotalHits();

            logger.debug("got so far [{}] hits on expected [{}]", totalHits, expected);

            return totalHits;
        }, expected, timeout.millis(), TimeUnit.MILLISECONDS);

This way, I'm not closing the bulk ingester until the application closes.

1 Like

Correcting myself, after some testing, I realized that the statement I made earlier does not seem to be accurate:

but this is adding significant overhead to our production environments.

Sorry about that. We conducted some benchmarks and couldn't measure any significant penalty from closing/creating.

1 Like

Ha great!
I was also curious about the overhead! Thanks for clarifying :wink:

1 Like