Bulk processor: Cannot find bulk threads while profiling

We are using BulkProcessor for indexing data.
We have set the ConcurrentRequest parameter to 5.

I am profiling my application and expect that i could see 5 bulk request threads in the live thread monitoring. But i cannot find them and indexing is going on successfully.

Any suggestion here ?

what Elasticsearch version are you using?

You might want to check for a thread pool named write, as that thread pool has been renamed and merged with the index one.

See https://github.com/elastic/elasticsearch/pull/29593 and https://github.com/elastic/elasticsearch/pull/29556

The ES version i am using is 6.6.2
So it is the write pool that i can see on ES machine profiling. That is fine.

My question is related to the BulkProcessor that i am using in my client to generate Bulk indexing requests.
No matter how many ConcurrentRequests i set in my client, i cannot see the corresponding live threads while profiling.
My code:

final BiConsumer<BulkRequest, ActionListener> bulkConsumer =
(request, bulkListener) -> highLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

bulkProcessor =
BulkProcessor.builder(bulkConsumer, listener).setBulkActions(-1)
.setFlushInterval(TimeValue.timeValueMillis(5000l))
.setBulkSize(new ByteSizeValue(15, ByteSizeUnit.MB))
.setConcurrentRequests(5)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(50l), 8))
.build();

How are concurrent requests generated internally ? How do they work ? I am interested in their actual working to tune my indexing paramters for BulkProcessor.

the bulkprocessor class would be reused in your client among several threads. For more information you might want to check the BulkRequestHandler.execute method.

Thanks for your reply.

Yes, i have gone through the said method and also debugged it.

My observations:
In case if i have single BulkProcessor instance and all my threads (say 10) are putting data via ".add" method, i will always have single "/_bulk" at a time because the Semaphore allows single thread at a time.

Query:
"/_bulk" is single at any time, which goes to ES. Number of threads that put data in BulkProcessor are in my control (say 10).
Where and how does parameter "concurrentRequest" come into the picture ?

if you have 10 threads, you could set the number of concurrent requests going to ES to a smaller number so that some of your threads would block if that number is reached and data needs to be sent to ES.

1 Like

Number of concurrent request (/_bulk) going to ES from my client is actually always 1, as i have only one instance of BulkProcessor.

Once a single _bulk (say with 15Mb of IndexRequests) reaches ES, i understand that they are executed concurrently.
I may be missing something, but i still do not get where in the client does "ConcurrentRequest" spawned.

Current understanding of flow is:
Step 1: 10threads adding data to single bulkProcessor
Step 2: BulkProcessor stores all the IndexRequest until it fills a _bulk request with 15Mb in my case.
Step 3: Single _Bulk with 15Mb of IndexRequests is fired to ES
Step 4: Repeat sequentially from Step 1 again

In above 1 to 4 where does ConcurrentRequest plays part ? I can see nothing happening concurrently on client side.

you need to use a single bulk processor instance with multiple thread on the client side in order to hit the concurrent requests limitation of that bulk processor instance.

The idea here is that on the client side you can add arbitrary number of documents from an arbitrary number of threads to the bulk processor, and the bulk processor handles the sending of requests and also blocks in case you are trying to index more documents than the limitations you provided on when creating the bulk processor.

Imagine you are setting the limit to 1000 documents and 5 concurrent requests. Now imagine a single execution of that bulk request takes 5 seconds. You keep adding index requests to the bulk processor. After half a second you have 1000 documents, after another 500ms another 1000, so that after 2500ms you have 5 requests being executed. Also imagine processing a single requests takes 5 seconds (for the sake of example). Now after 2500ms 5 concurrent requests are being executed to ES, which are still running. If you keep adding documents now and hit the 1000 documents limit, then this requests will not be sent immediately to ES, but the call to add blocks until one of the currently being executed requests is finished.

Does this make more sense?

A bit clearer now.

Do you mean that BulkProcessor sends 5 concurrent _bulk REST calls to ES, from the client ?
If yes, i have debugged the code to see that this is never the case. These 5 requests are sent sequentially, due to the Semaphore.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.