BulkIngester and Integration Testing

Can someone point me to some examples of integration testing while using the BulkIngester? Specifically how to test a class method that uses the BulkIngester. I'm having timing issues with calling flush on the BulkIngester so that the test fails randomly. Thanks.

This might help:

Thanks. I looked at that already. I can't call .close() on the BulkIngester or it causes other tests to fail. I have many classes that encapsulate the BulkIngester and add operations to it. I'm trying to test those classes. For example, trying to test method1 below is problematic. Any recommendations?

public void method1() {
    BulkIngester.instance().add( // operation);
}

What I do in that case is to wait up for 30 seconds to have the expected number of documents.

See

Thanks.

When I create an index in a integration test and then try to use it I get random failures. I tried something like this and it seemed to work at first but then starting failing.

    protected boolean waitForIndexToBeAvailable(String index) throws IOException {
        HealthResponse healthResponse = elasticsearchClient.cluster().health(hr -> hr
            .index(index)
            .waitForStatus(HealthStatus.Yellow)
            .waitForActiveShards(w -> w
                .count(1))
            .timeout(t -> t
                .time("10s")));

        return !healthResponse.timedOut();
    }

This is really frustrating. Any advice?

Thanks.

What are the failures?

Thanks. The test passes sometimes and fails other times. Usually the failure is on the first assertion (assertEquals). It's random. Seems to be a timing issue. The SUT creates two indices so in my test I have assertions that both indices exist. Then in the test I run two searches to verify the count of docs in each of the two indices. I'm using the ElasticsearchClient (not async client) and the test and SUT is single threaded. Here's the test:

	@Test
	public void testRunRetention() throws IOException {
		HistoryRetentionTask task = new HistoryRetentionTask(systemIds);
		task.runRetention();

        // Need to flush the bulk ingester now since it was used in runRetention
        bulkIngester.flush();

        if (!waitForIndexToBeAvailable("test-" + dateSuffix)) {
            fail("Index not available");
        }

        if (!waitForIndexToBeAvailable("test1-" + dateSuffix)) {
            fail("Index not available");
        }

        // Refresh so we can search for docs
        refresh("test-" + dateSuffix);
        refresh("test1-" + dateSuffix);

		// Check correct number of results in each system id index
        assertEquals(3, getHitsTotal("test-" + dateSuffix));
        assertEquals(1, getHitsTotal("test1-" + dateSuffix));
	}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.