BulkProcessor: internalAdd method

We are trying to put millions of documents through BulkProcessor into ES.

We have 10threads which read 10 files and adds a IndexRequest to BulkProcessor.

final BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                (request, bulkListener) -> highLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

bulkProcessor =
            BulkProcessor.builder(bulkConsumer, listener).setBulkActions(-1)
                .setFlushInterval(TimeValue.timeValueMillis(5000l))
                .setBulkSize(new ByteSizeValue(15, ByteSizeUnit.MB))
                .setConcurrentRequests(16)
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
                    TimeValue.timeValueMillis(50l), 8))
                .build();

The add method of bulk processor is synchronized allowing only one thread at a time to put the docs in the BulkProcessor.

final IndexRequest request =
                            new IndexRequest(indexName, indexDocument.getType(), indexDocument.getId())
                                .source(indexDocument.getJson(), XContentType.JSON);
                        bulkProcessor.add(request);

Queries:

  1. On profiling i understand that most of the time is spent in internalAdd method of BulkProcessor. Why not. Multiple threads are competing to put IndexRequests in queue. Is this expected or is there other way to put IndexRequests ?
  2. Should i use multiple BulkProcessor instances for all my 10 threads which are reading 10 files ?
    3 No matter what BulkSize i keep, no matter what flush interval i keep, the total indexing time required is always same. This is very surprising. Can somebody explain behavior for my case ?

ES version = 6.6.2

there has been a recent fix regarding this behaviour, see https://github.com/elastic/elasticsearch/pull/41451

--Alex

Cool. Very similar to the query that i have.
Are the label to the issue, indication of what version of ES this is getting shipped into ?

I cannot switch very early to latest ES, because i am using Standard analyzer :slight_smile:

yes, the labels indicate into what version a PR was merged.

Even though this is diverging in to a different discussion: can you elaborate why using the standard analyzer prevents you from switching to another major version of elasticsearch?

I beg your pardon. You have asked the question that helped us think through, again. Thanks.

We are using standard token filter and are also using the standard tokenizer.
We were under assumption that standard analyzer is being deprecated.
We are getting logs in deprecated logs:
"The [standard] token filter is deprecated and will be removed in a future version."

The fact is that the standard token filter is being removed not the standard tokenizer or analyzer.

Back to the thread:
We will consider upgrading to the latest version now, but eagerly waiting for the internalAdd synchronizing fix in v7.3.0.

note that 7.3 is also not released yet.

Please check the breaking changes notes for 7.x about the standard token filter at https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking-changes-7.0.html#standard-filter-removed

The standard token filter has been removed because it doesn’t change anything in the stream.

I don't think you are missing out too much :slight_smile:

Absolutely :slightly_smiling_face:

Thanks for your help ! Appreciate that.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.