Bulk index api with upsert


(Navneet Mathpal) #1

Hi ,

I am using bulk api with upsert , some times it is not giving me unusual result .
when I am sending the document in 200's bulk.

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("Going to execute new bulk composed of {} actions : "+request.numberOfActions());
}

        	    @Override
        	    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        	    	 System.out.println("Executed bulk composed of {} actions : "+request.numberOfActions());
        	    }
        	    @Override
        	    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        	    	 System.out.println("Error executing bulk : "+failure);
        	    }
        	    }).setBulkActions(200).setFlushInterval(TimeValue.timeValueMinutes(5)).setConcurrentRequests(10).build(); 

for (i=0;i<=400;i++)
{
IndexRequest indexRequest = new IndexRequest(indexName, indexType, "your-doc-ID")
.source(jsondoc);

        	UpdateRequest updateRequest = new UpdateRequest(indexName, indexType, "your-doc-ID")
        	        .doc(jsonBuilder()
        	            .startObject()
        	            .field("LastUpdateTime" , "datetime" + i)
        	            .endObject())
        	        .upsert(indexRequest);                     	
        	bulkProcessor.add(updateRequest);
        	System.out.println("Done Updation");

}

so each time LastUpdateTime is getting updated and some time is is showing me datetime200 and sometimes datetime400.

But when I am increasing the size of my bulk to 1000 then It is working fine ..
but when bulk size is 200 , sometimes it is showing me datetime200 and sometimes datetime400.

Could you pls let me know , why it is showing this distinguish unusual behavior


(Mark Harwood) #2

Looks like you have concurrent requests updating the same doc with values that were incremented in the client. The sequencing of these updates is not controlled so the final outcome will be non-deterministic.
If you want to increment something in an multi-threaded fashion you need to use the optimistic locking controls we make available to do this - see [1] with particular reference to version numbers and retry_on_conflict.

[1] https://www.elastic.co/guide/en/elasticsearch/reference/2.1/docs-update.html


(Navneet Mathpal) #3

Hi,

When I used the same with retry_on_conflict it is giving me non-deterministic results.
but how can I use optimistic locking and how can I maintain the version number for each doc when sending the doc using java API.I guess for doing that I have to first get the version number using get api right ?


(Mark Harwood) #4

Retry on conflict tackles conflicts that may occur in the time between an update operation reads then updates a doc. A fundamental assumption of retrying updates is that in the event of a conflict it is legitimate to retry that update operation on the latest version.
Clearly this is not the case if you are sending a sequence of pre-incremented values via a multi-threaded system that doesn't preserve this sequence. If you follow the examples for incrementing in our docs you'll notice that what is sent are a series of scripts that perform the increment action local to the data as these are safely re-tryable when there are conflicts.


(system) #5