Hello everyone,
I'm trying to insert data into ElasticSearch using RestHighLevelClient and BulkProcessor. The code I'm using for the BulkProcessor is the following.
BulkProcessor getBulkProcessor(RestHighLevelClient restHighLevelClient) {
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, bulkListener) -> {
try {
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
} catch (IOException e) {
final String message = "Failed to create BulkProcessor for ElasticSearch";
throw new RuntimeException(message, e);
}
};
return BulkProcessor.builder(consumer, getListener()).setConcurrentRequests(0).build();
}
BulkProcessor.Listener getListener() {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
System.out.println(String.format("Executing bulk %d with %d requests", executionId, numberOfActions));
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
throw new RuntimeException(String.format("Adding document %s to ElasticSearch failed", failure.getId()),
failure.getCause());
}
}
System.out.println(String.format("Bulk %d executed with failures", executionId));
} else {
System.out.println(String.format("Bulk %d completed in %d milliseconds", executionId, bulkResponse.getTook().getMillis()));
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
System.out.println(String.format("Failed to execute bulk %s", failure));
throw new RuntimeException(failure);
}
};
}
then
public static void main(String[] args) {
BulkProcessor bulkProcessor = getBulkProcessor(/* pass RestHighLevelClient there */);
DocWriteRequest docWriteRequest = /* logic to have a DocWriteRequest */;
bulkProcessor.add(docWriteRequest);
bulkProcessor.flush();
bulkProcessor.close();
return null;
}
My problem is the following :
My ElasticSearch instance successfully add a new document. But my BulkProcessor doesn't seems to receive a response and AfterBulk is never executed (hence the program doesn't terminate because I add elements in a sync manner).
Does something seems to be wrong with what I'm doing or am I missing something ?
Thanks for you help !