BulkProcessor never executing AfterBulk

Hello everyone,

I'm trying to insert data into ElasticSearch using RestHighLevelClient and BulkProcessor. The code I'm using for the BulkProcessor is the following.

    BulkProcessor getBulkProcessor(RestHighLevelClient restHighLevelClient) {
        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, bulkListener) -> {
            try {
                restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
            } catch (IOException e) {
                final String message = "Failed to create BulkProcessor for ElasticSearch";
                throw new RuntimeException(message, e);
            }
        };

        return BulkProcessor.builder(consumer, getListener()).setConcurrentRequests(0).build();
    }

    BulkProcessor.Listener getListener() {
        return new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                int numberOfActions = request.numberOfActions();
                System.out.println(String.format("Executing bulk %d with %d requests", executionId, numberOfActions));
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse bulkResponse) {
                if (bulkResponse.hasFailures()) {
                    for (BulkItemResponse bulkItemResponse : bulkResponse) {
                        if (bulkItemResponse.isFailed()) {
                            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                            throw new RuntimeException(String.format("Adding document %s to ElasticSearch failed", failure.getId()),
                                                       failure.getCause());
                        }
                    }
                    System.out.println(String.format("Bulk %d executed with failures", executionId));
                } else {
                    System.out.println(String.format("Bulk %d completed in %d milliseconds", executionId, bulkResponse.getTook().getMillis()));
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                System.out.println(String.format("Failed to execute bulk %s", failure));
                throw new RuntimeException(failure);
            }
        };
    }

then

public static void main(String[] args) {
        BulkProcessor bulkProcessor = getBulkProcessor(/* pass RestHighLevelClient there */);
        DocWriteRequest docWriteRequest = /* logic to have a DocWriteRequest */;
        bulkProcessor.add(docWriteRequest);
        bulkProcessor.flush();
        bulkProcessor.close();
        return null;
}

My problem is the following :
My ElasticSearch instance successfully add a new document. But my BulkProcessor doesn't seems to receive a response and AfterBulk is never executed (hence the program doesn't terminate because I add elements in a sync manner).

Does something seems to be wrong with what I'm doing or am I missing something ?

Thanks for you help !

Changing bulk to bulkASync in BiConsumer solved my problem.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.