Wait_for not working as expected

Hello,

I'm performing a test indexing documents with 'wait_for' refresh policy using a bulk processor. This bulk processor flushes documents every 500ms. The index refresh interval is 60s for this example.

When I perform a search that tries to get the previous indexed document, I expected this search to be retained for several seconds waiting for a refresh to occur. But the search is immediately executed and no results are returned (because the document is not visible yet).

This is a trace with the sequence of actions:

(1) First a put with the document.
(2) After 200ms you'll see the bulk executed, printing the refresh policy used.
(3) Then the search is executed, the first immediately after the bulk execution and then every 5s. The request and response are printed. These are the lines (3a),(3b),(3c),(3d),(3e),(3f),(3g),(3h)
(4) Finally, at the step (4), the document is obtained, just 40s after the bulk execution.

(1) 19:40:57:997 :: put :: document = {"code":"TESTMC","end":"20180830","start":"20180826","creationTime":1625593257996,"expirationTime":1625593317996}

(2) 19:40:58:119 :: beforeBulk :: request refresh policy = wait_for

(3a) 19:40:58:457 :: search to execute :: request by get = '/96-*,96_0/_search?_source=expirationTime&track_total_hits=false', document = { "query" : { "bool" : { "filter" : [{"range":{"expirationTime":{"gt":1625593258457}}},{ "term" : { "code" : "TESTMC" } }] }} ,"size":3000,"sort":{"expirationTime":"asc"}}
19:40:58:463 :: search executed :: response = {"took":1,"timed_out":false,"_shards":{"total":3,"successful":3,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]}}

(3b) 19:41:03:607 :: search to execute :: request by get = '/96-*,96_0/_search?_source=expirationTime&track_total_hits=false', document = { "query" : { "bool" : { "filter" : [{"range":{"expirationTime":{"gt":1625593263607}}},{ "term" : { "code" : "TESTMC" } }] }} ,"size":3000,"sort":{"expirationTime":"asc"}}
19:41:03:613 :: search executed :: response = {"took":1,"timed_out":false,"_shards":{"total":3,"successful":3,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]}}

(3c) 19:41:08:742 :: search to execute :: request by get = '/96-*,96_0/_search?_source=expirationTime&track_total_hits=false', document = { "query" : { "bool" : { "filter" : [{"range":{"expirationTime":{"gt":1625593268742}}},{ "term" : { "code" : "TESTMC" } }] }} ,"size":3000,"sort":{"expirationTime":"asc"}}
19:41:08:748 :: search executed :: response = {"took":1,"timed_out":false,"_shards":{"total":3,"successful":3,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]}}

(3d) 19:41:13:879 :: search to execute :: request by get = '/96-*,96_0/_search?_source=expirationTime&track_total_hits=false', document = { "query" : { "bool" : { "filter" : [{"range":{"expirationTime":{"gt":1625593273879}}},{ "term" : { "code" : "TESTMC" } }] }} ,"size":3000,"sort":{"expirationTime":"asc"}}
19:41:13:886 :: search executed :: response = {"took":1,"timed_out":false,"_shards":{"total":3,"successful":3,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]}}

(3e) 19:41:19:015 :: search to execute :: request by get = '/96-*,96_0/_search?_source=expirationTime&track_total_hits=false', document = { "query" : { "bool" : { "filter" : [{"range":{"expirationTime":{"gt":1625593279015}}},{ "term" : { "code" : "TESTMC" } }] }} ,"size":3000,"sort":{"expirationTime":"asc"}}
19:41:19:021 :: search executed :: response = {"took":1,"timed_out":false,"_shards":{"total":3,"successful":3,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]}}

(3f) 19:41:24:140 :: search to execute :: request by get = '/96-*,96_0/_search?_source=expirationTime&track_total_hits=false', document = { "query" : { "bool" : { "filter" : [{"range":{"expirationTime":{"gt":1625593284140}}},{ "term" : { "code" : "TESTMC" } }] }} ,"size":3000,"sort":{"expirationTime":"asc"}}
19:41:24:146 :: search executed :: response = {"took":1,"timed_out":false,"_shards":{"total":3,"successful":3,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]}}

(3g) 19:41:29:278 :: search to execute :: request by get = '/96-*,96_0/_search?_source=expirationTime&track_total_hits=false', document = { "query" : { "bool" : { "filter" : [{"range":{"expirationTime":{"gt":1625593289278}}},{ "term" : { "code" : "TESTMC" } }] }} ,"size":3000,"sort":{"expirationTime":"asc"}}
19:41:29:283 :: search executed :: response = {"took":1,"timed_out":false,"_shards":{"total":3,"successful":3,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]}}

(3h) 19:41:34:424 :: search to execute :: request by get = '/96-*,96_0/_search?_source=expirationTime&track_total_hits=false', document = { "query" : { "bool" : { "filter" : [{"range":{"expirationTime":{"gt":1625593294424}}},{ "term" : { "code" : "TESTMC" } }] }} ,"size":3000,"sort":{"expirationTime":"asc"}}
19:41:34:430 :: search executed :: response = {"took":1,"timed_out":false,"_shards":{"total":3,"successful":3,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]}}

(4) 19:41:39:549 :: search to execute :: request by get = '/96-*,96_0/_search?_source=expirationTime&track_total_hits=false', document = { "query" : { "bool" : { "filter" : [{"range":{"expirationTime":{"gt":1625593299549}}},{ "term" : { "code" : "TESTMC" } }] }} ,"size":3000,"sort":{"expirationTime":"asc"}}
19:41:39:559 :: search executed :: response = {"took":4,"timed_out":false,"_shards":{"total":3,"successful":3,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[{"_index":"96-000002","_type":"_doc","_id":"96|61421625593257761|TESTMC|20180830|20180826|20180826|8.2|9.0|8.1|9.0|13.area|14.0|15.1|13.city|14.0|15.|13.accommodationCode|14.1|15.|13.priceType|14.2|15.3|13.cancelPolicies|14.2|15.1|","_score":null,"_source":{"expirationTime":1625593317996},"sort":[1625593317996]}]}}

These are the index settings:

{"96-000002":{"settings":{"index":{"lifecycle":{"name":"ventus_policy_96","rollover_alias":"96","indexing_complete":"true"},"refresh_interval":"60000ms","number_of_shards":"1","provided_name":"96-000002","creation_date":"1625594221210","number_of_replicas":"0","uuid":"FazfEG9MRYKaJjhgKvr-jQ","version":{"created":"7070099"}}}}}

This is the 'beforeBulk' where we set the refresh policy for all documents in this bulk:

public void beforeBulk(long executionId, BulkRequest request) {
    request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
System.out.println(FastDateFormat.getInstance("HH:mm:ss:SSS").format(System.currentTimeMillis()) + " :: beforeBulk :: request refresh policy = " + request.getRefreshPolicy().getValue());
}

If I change WriteRequest.RefreshPolicy.WAIT_UNTIL to WriteRequest.RefreshPolicy.IMMEDIATE then it works fine (as expected).

What am I mising?

Thanks,

Joan.

Hey,

I'm not sure if some things are mixed up here. When you are using the refresh parameter, this never applies to a search, but only to the write operations. So when the bulk processor sends a bulk request, then that request stays open and connected until the next refresh happens. The search request will just be executed independent from the fact if the bulk processor is finished or not. As you have not added any indication when the bulk request returns (and you used the beforeBulk method, which is executed before the bulk request is sent), this could be happening anytime.

Hope that helps with the fundamentals.

Hi Alex,

Yes, I was totally confused.

Thanks,

Joan.