Use BulkProcessor with RefreshPolicy.WAIT_UNTIL

The the current documentation [1] states how to wait for bulk requests to become visible for search:

BulkRequest request = new BulkRequest();
// wait for bulk transaction results to become visible for search
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 

On the other hand, it recommends the use of the BulkProcessor. How can I configure the BulkProcessor with a particular RefreshPolicy?

[1] https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-bulk.html

Hey,

I am not sure that I understand how this is supposed to work. When using the bulk processor you do not have a guarantee when a document will be indexed, you just know that it will happen. If you do not know when it will be indexed, then waiting for a refresh does not make sense.

if you need to make sure something is made immediately available for search, you can index it in a separate bulk request with a refresh or wait like you did above.

Maybe you can explain a little bit about your use-case so we can see, what a good implementation would look like.

--Alex

Hi,

this is my current solution, creating the BulkRequests directly:

/**
 * Add data to index as a bulk operation
 *
 * @param bulkData Map of key-value pairs (docId, json payload)
 * @param waitFor  Wait until data becomes visible (readable) in ES
 */
void bulkAddToIndex(BulkDocumentMap bulkData, boolean waitFor) throws IOException {
    final BulkRequest request = new BulkRequest();
    request.timeout(DEFAULT_TIMEOUT);
    if (waitFor) {
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
    }
    for (String key : bulkData.keySet()) {
        final IndexRequest ir = new IndexRequest(indexName);
        ir.id(key);
        ir.source(bulkData.getAsJson(key), XContentType.JSON);
        request.add(ir);
    }
    elasticClient.bulk(request, RequestOptions.DEFAULT);
}

If I code it like that I lose some convenience provided by the BulkProcessor (e.g. that already handles the bulk partitioning and request concurrency).

What I need is some kind of configuration parameter for the BulkProcessor.Builder to make the BulkProcessor construct the individual requests with a particular request policy:

final BulkProcessor.Builder builder = BulkProcessor.builder(
                (request, bulkListener) ->
                        elasticClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener);
// set refresh policy for bulk requests
builder.setBulkRequestRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);

Does that make sense?

Hey,

to me this does not make sense, as you do not know when the bulk processor will send that request, as it does not block. If you want the bulk processor to block, you would need to disable concurrency could call flush + refresh manually, but then you loose those features like you mentioned.

To me it's either manually bulk request with refresh/waitfor or bulk processor, but nothing in between.

May I ask about the use-case? As you are able to call a get request immediately, you may not need to be able to execute a search within the next refresh interval - but that's just an assumption.

--Alex

Our application uses ES as a fast search/query index for a relational database that stores geographical data.

I use bulk requests to sync the contents occasionally and on demand, e.g. when new data is entered by the user (or mass-imported) and checked afterwards within the application. During such manual edits/checks it is important that the changes to database and index are virtually atomic: On import completion the editing application fires a search query to the ES index to display the location data in the viewport's geospatial extent immediately after the edit/import. If the index is not synchronized, some locations are missing.

I agree that I cannot influence the start of an individual batch request. But what I could do (assuming the BulkProcessor permits the configuration of the RefreshPolicy) is to synchronize against the submitted batch processor upon termination, e.g. processor.awaitClose(). It that case I would have fine grained control about the post-conditions of the submitted set of batch queries (e.g. that upon clean termination all submitted queries satisfy the condition RefreshPolicy.WAIT_UNTIL).

if you need data to be immediately searchable after storing, then do not use bulk processor in this case, but just send off a single request with refresh or waitFor. However, if you don't need to search, but just retrieve the document via get, then using the get document API might be sufficient.

This is exactly is what I am doing now. Just wanted to make sure that there is no other way to achieve the same goal with better concurrency.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.