Hello everyone,
I am a beginner, and my English is not very good. I am using ES 7.10 with the Java programming language, so I am using the Java High-Level REST Client.
I want to be able to search for relevant content immediately after bulk indexing, and I have learned that setting the refreshPolicy can solve this issue. However, I encountered some trouble. When I set the refreshPolicy to WAIT_UNTIL, I am unable to immediately search for relevant results after inserting or modifying an index. But when I set the refreshPolicy to IMMEDIATE, it works as expected.
I'm not sure about the reasons behind this behavior, and I would appreciate it if someone could explain it to me.
Thank you.
@Autowired
private RestHighLevelClient restHighLevelClient;
@GetMapping("/add")
public void ttt() {
try (BulkProcessor bulkProcessor = this.getBulkProcessor(WriteRequest.RefreshPolicy.WAIT_UNTIL)) {
for (int i = 0; i < 10; i++) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("sort", Math.random());
bulkProcessor.add(
new IndexRequest("test_index")
.id(String.valueOf(i))
.source(jsonObject.toJSONString(), XContentType.JSON)
);
}
}
}
@GetMapping("/search")
public Object search() throws Exception {
SearchResponse response = restHighLevelClient.search(
new SearchRequest("test_index")
.source(
new SearchSourceBuilder()
.size(10)
.sort("sort")
),
RequestOptions.DEFAULT
);
log.info("Search response: " + response.getHits().getHits());
return Arrays.stream(response.getHits().getHits())
.map(i -> i.getId() + "=" + i.getSourceAsMap().get("sort"))
.collect(Collectors.toList());
}
private BulkProcessor getBulkProcessor(WriteRequest.RefreshPolicy refreshPolicy) {
return BulkProcessor.builder(
(request, bulkListener) -> {
if (refreshPolicy != null) {
request.setRefreshPolicy(refreshPolicy);
}
restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
},
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
log.info("before, indices={}, numberOfActions={}, executionId={}", request.getIndices(), request.numberOfActions(), executionId);
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
log.info("after Response, indices={}, numberOfActions={}, executionId={}", request.getIndices(), request.numberOfActions(), executionId);
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
log.error("after Throwable, indices={}, failure={}, executionId={}", request.getIndices(), failure, executionId);
}
})
.setBulkActions(5000)
.setBulkSize(new ByteSizeValue(100, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(10)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(5000), 5)
)
.build();
}