We are trying to put millions of documents through BulkProcessor into ES.
We have 10threads which read 10 files and adds a IndexRequest to BulkProcessor.
final BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> highLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
bulkProcessor =
BulkProcessor.builder(bulkConsumer, listener).setBulkActions(-1)
.setFlushInterval(TimeValue.timeValueMillis(5000l))
.setBulkSize(new ByteSizeValue(15, ByteSizeUnit.MB))
.setConcurrentRequests(16)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(50l), 8))
.build();
The add method of bulk processor is synchronized allowing only one thread at a time to put the docs in the BulkProcessor.
final IndexRequest request =
new IndexRequest(indexName, indexDocument.getType(), indexDocument.getId())
.source(indexDocument.getJson(), XContentType.JSON);
bulkProcessor.add(request);
Queries:
- On profiling i understand that most of the time is spent in internalAdd method of BulkProcessor. Why not. Multiple threads are competing to put IndexRequests in queue. Is this expected or is there other way to put IndexRequests ?
- Should i use multiple BulkProcessor instances for all my 10 threads which are reading 10 files ?
3 No matter what BulkSize i keep, no matter what flush interval i keep, the total indexing time required is always same. This is very surprising. Can somebody explain behavior for my case ?
ES version = 6.6.2