RefreshPolicy WAIT_UNTIL does not work when using BulkProcessor in Java Client

Hello everyone,

I am a beginner, and my English is not very good. I am using ES 7.10 with the Java programming language, so I am using the Java High-Level REST Client.

I want to be able to search for relevant content immediately after bulk indexing, and I have learned that setting the refreshPolicy can solve this issue. However, I encountered some trouble. When I set the refreshPolicy to WAIT_UNTIL, I am unable to immediately search for relevant results after inserting or modifying an index. But when I set the refreshPolicy to IMMEDIATE, it works as expected.

I'm not sure about the reasons behind this behavior, and I would appreciate it if someone could explain it to me.

Thank you.


    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @GetMapping("/add")
    public void ttt() {
        try (BulkProcessor bulkProcessor = this.getBulkProcessor(WriteRequest.RefreshPolicy.WAIT_UNTIL)) {
            for (int i = 0; i < 10; i++) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("sort", Math.random());

                bulkProcessor.add(
                        new IndexRequest("test_index")
                                .id(String.valueOf(i))
                                .source(jsonObject.toJSONString(), XContentType.JSON)
                );
            }
        }
    }

    @GetMapping("/search")
    public Object search() throws Exception {
        SearchResponse response = restHighLevelClient.search(
                new SearchRequest("test_index")
                        .source(
                                new SearchSourceBuilder()
                                        .size(10)
                                        .sort("sort")
                        ),
                RequestOptions.DEFAULT
        );

        log.info("Search response: " + response.getHits().getHits());

        return Arrays.stream(response.getHits().getHits())
                .map(i -> i.getId() + "=" + i.getSourceAsMap().get("sort"))
                .collect(Collectors.toList());
    }

    private BulkProcessor getBulkProcessor(WriteRequest.RefreshPolicy refreshPolicy) {
        return BulkProcessor.builder(
                        (request, bulkListener) -> {
                            if (refreshPolicy != null) {
                                request.setRefreshPolicy(refreshPolicy);
                            }

                            restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
                        },
                        new BulkProcessor.Listener() {
                            @Override
                            public void beforeBulk(long executionId,
                                                   BulkRequest request) {
                                log.info("before, indices={}, numberOfActions={}, executionId={}", request.getIndices(), request.numberOfActions(), executionId);
                            }

                            @Override
                            public void afterBulk(long executionId,
                                                  BulkRequest request,
                                                  BulkResponse response) {
                                log.info("after Response, indices={}, numberOfActions={}, executionId={}", request.getIndices(), request.numberOfActions(), executionId);
                            }

                            @Override
                            public void afterBulk(long executionId,
                                                  BulkRequest request,
                                                  Throwable failure) {
                                log.error("after Throwable, indices={}, failure={}, executionId={}", request.getIndices(), failure, executionId);
                            }
                        })
                .setBulkActions(5000)
                .setBulkSize(new ByteSizeValue(100, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(10))
                .setConcurrentRequests(10)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(5000), 5)
                )
                .build();
    }

Welcome!

I guess you are trying this in an integration test, right?

The problem here I think is that you are using a bulk processor which eventually will index your data.
You need to manually flush the processor (or close it) before starting your tests. So all documents have been flushed and the refresh has happened.

In production, I'd not set the refresh policy so the default behavior is used. Unless you really have a business constraint.

2 more comments:

  • 7.10 is super old. Please use 7.17.11 instead. Or better switch to 8.8.2.
  • switch to the new Java client.

Thank you for your reply. Please forgive me for being busy these days and not checking the replies on the discussion.

As you can see, I used the form "try(BulkProcessor bulkProcessor = ...)" to declare the bulkProcessor, which is the optimization suggested by IntelliJ IDEA. Initially, I manually called the close method, but WAIT_UNTIL didn't work as expected.

Indeed, the refresh operation does increase the system load. The operation I want to achieve is to have a list sorted by status, such as urgent, normal, completed, and failed. Users are allowed to select multiple items on the list and change their status in bulk. After modifying them, I want to refresh the page immediately to ensure correct sorting. However, as I mentioned before, using WAIT_UNTIL does not achieve this effect.

Thank you

In that case, you don't need to use a BulkProcessor IMO but directly a bulk API call.

With the new client, you could write something like this:

HTH

You are right, using restHighLevelClient.bulk worked successfully.

However, this made me notice that the possible reason for this issue might be the way I used restHighLevelClient.bulkAsync in the BulkProcessor.builder, following the documentation and information from online sources. Perhaps, this is what caused the lack of synchronization since bulkAsync operates asynchronously, and evidently, using asynchronous processing would be the more appropriate approach in the BulkProcessor. Nevertheless, during my learning process, I replaced bulkAsync with bulk for testing, and it turned out to be successful.

Anyway, thank you very much for your explanation.

                        (request, bulkListener) -> {
                            if (configDTO.getRefreshPolicy() != null) {
                                request.setRefreshPolicy(configDTO.getRefreshPolicy());
                            }

                            try {
                                restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }

Indeed. BulkAsync exexutes asynchronously so you don't have any control on "when" it's going to be finished.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.