Missing records when using BulkProcessor (Java High Level Rest Client)

Hi,

I use BulkProcessor to index records. I see that some of the records are missing when doing bulk indexing. I don't see any Exception or Errors, also, in afterBulk callback there are no failures. BulkProcessor is singleton spring bean so, I don't explicitly call awaitAndClose or flush. It would be of great help if somebody helps in guiding me to resolve the issue.

this.bulkProcessor = BulkProcessor.builder(
(request, bulkListener) -> esClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.debug("going to execute bulk of {} requests", request.numberOfActions());
}

        @Override
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            logger.debug("bulk executed {} failures", response.hasFailures() ? "with" : "without");
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            logger.warn("error while executing bulk", failure);
        }
    })
    .setBulkActions(1000)
    .setFlushInterval(TimeValue.timeValueSeconds(5))
    .build();						}

You probably need to close the bulk processor before exiting your application if your application is stopped at some point.

Our's is a continuous running application, which keeps indexing data as and when some event occurs. We will not be explicitly stopping the application, unless there is a deployment needed. Also, I call close on bulkprocessor when spring bean is about to be destroyed by the spring container.

One of the observation is if the application is running only one node then there are no missing records, but If I have my application running on multiple nodes, then I see that the records are missing.

The code you shared seems ok though. So it might be something else.
How are you checking for missing records?

For a given requestId I get the count from both elasticsearch as well as my database, both of them are not matching

POST /data_v1/_count
{
"query": {
"bool": {"must": [
{"term": {
"requestId": {
"value": "8f0bcc60-0b91-11ea-a375-1b01714de23a"
}
}}
]}
}
}

Are you waiting for the refresh to happen before calling this?
Or any chance you have duplicated Ids?

yes, I wait for refresh to happen before this. There is no chance of duplicate ids as this is working on single node.

Could you share the code part where you are adding the index request to the BulkProcessor ?

Below is the code, please let me know if this is good enough.

@Component
public class ElasticSearchTemplate implements InitializingBean {

@Autowired
private RestHighLevelClient restClient;
private BulkProcessor bulkProcessor;

@Value("${taxilla.es.initial.retry.wait.time:500}")
private int INITIAL_RETRY_WAIT_TIME;
@Value("${taxilla.es.bulk.num.of.actions:1000}")
private int BULK_NUM_OF_ACTIONS;
@Value("${taxilla.es.bulk.index.size:3}")
private int BULK_INDEX_SIZE;
@Value("${taxilla.es.bulk.flush.interval:5}")
private int BULK_FLUSH_INTERVAL;
@Value("${taxilla.es.index.failure.retry.count:5}")
private int INDEX_FAILURE_RETRY_COUNT;

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchTemplate.class);

@Override
public void afterPropertiesSet() {
	BulkProcessor.Builder bulkBuilder = BulkProcessor.builder(
			(request, bulkListener) -> restClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
			new BulkProcessor.Listener() {
				@Override
				public void beforeBulk(long executionId, BulkRequest request) {
					int numberOfActions = request.numberOfActions();
					LOGGER.info("beforeBulk Executing bulk [{}] with [{}] requests and {} size", executionId, numberOfActions, request.estimatedSizeInBytes());
				}

				@Override
				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
					try {
						if (response.hasFailures()) {
							LOGGER.warn("afterBulk Bulk [{}] executed with failures :[{}]", executionId, response.buildFailureMessage());
						} else {
                            LOGGER.info("afterBulk Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
                        }
					} catch (Exception ex) {
						LOGGER.error("after bulk error", ex);
					}
				}

				@Override
				public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
					LOGGER.error("afterBulk Error,  Failed to execute bulk", failure);
				}
			})
			.setBulkActions(BULK_NUM_OF_ACTIONS)
			.setConcurrentRequests(1)
			.setFlushInterval(TimeValue.timeValueSeconds(BULK_FLUSH_INTERVAL))
			.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(INITIAL_RETRY_WAIT_TIME), INDEX_FAILURE_RETRY_COUNT));

	this.bulkProcessor = bulkBuilder
			.setBulkSize(new ByteSizeValue(BULK_INDEX_SIZE, ByteSizeUnit.MB))
            .build();
}

public void refreshIndices() {
    try {
        restClient.indices().refresh(new RefreshRequest(SearchAppConstants.ASSET_INDEX_NAME), RequestOptions.DEFAULT);
    } catch(IOException ex) {
        LOGGER.error("Error while refreshing index : {}", ex.getMessage());
    }
}

public void index(IndexDetails indexDetails) {
    IndexRequest indexRequest = _index(indexDetails);
	bulkProcessor.add(indexRequest);
}

public void delete(IndexDetails indexDetails) {
    DeleteRequest deleteRequest = _delete(indexDetails);
	bulkProcessor.add(deleteRequest);
}

private DeleteRequest _delete(IndexDetails indexDetails) {
	return new DeleteRequest().id(indexDetails.getDocumentId())
			.routing(indexDetails.getRoutingId()).index(indexDetails.getSearchAlias());
}

private IndexRequest _index(IndexDetails indexDetails) {
	return new IndexRequest().routing(indexDetails.getRoutingId())
			.index(indexDetails.getIndexName()).source(indexDetails.getObject())
			.id(indexDetails.getDocumentId());
}

}

Are you sure that you are not sending documents with the same id? indexDetails.getDocumentId() can have duplicates?

yes I am sure, earlier I used highLevelRestClient.index() this used to work fine in the cluster, since it was too slow I moved to BulkProcessor. BulkProcessor is too fast, but now this problem of missing records.

Do I have to check anything with respect to write ThreadPools in Elasticsearch server. The problem is I donot see any rejections or errors from either elasticsearch server or my application.

I don't see how this is happening.
I have been using the bulkProcessor for years now.

It would may be help if you can find a document that has not been indexed and try to index it manually to see if there are rejections?