Settings to use with RestHighLevelClient and BulkProcessor

I am have written the very basic first steps of a java app to build 6 indexes. All works okay with somethign along these lines -

`highLevelRestClient.bulkAsync(request, bulkActionListener);` 

until I add concurrency and then I get mainly successful writes but with a lot of java.util.concurrent.TimeoutException exceptions.

The error occurs in the low level client without registering as a failure in the high level client. I assume this is the correct behaviour but as far as I can see this makes it difficult to know which items failed and to resend them to the client (all actions at the moment are adding new documents to the index (other than the initial create index and type calls))?

I then thought after some looking around that I should use the BulkProcessor as it seems on retrospect that this is the best way to handle concurrency (?) and also it seems to listen at the level of the high level rest client for errors as well as success and failures. I've struggled with this a little and moved from 5.6.3 to 6.0.0-rc2, The code now builds but in this version I need to instantiate an org.elasticsearch.threadpool ThreadPool object which takes a org.elasticsearch.common.settings Setting object in the constructor. It may be google fatigue but I just can't seem to find any documentation about what needs to go into this settings object.

I've found an example here

threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "high-level-client").build());

and understand it is basically a map but I can't quite get my head around whether they are threadpool settings - size etc, or cluster/index/node settings and in either case what I would or wouldn't have to set to get it working in the first instance ?

So to be more open this is the code I'nm putting in my config file (at an early pre smart stage)

    @Bean
	public BulkProcessor bulkProcessor() throws Exception {
		//https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.0/java-rest-high-document-bulk.html
		
		ThreadPool threadPool = new ThreadPool(Settings.builder().put().build());

		BulkProcessor.Builder builder = new BulkProcessor.Builder(getRestHighLevelClient()::bulkAsync, listener, threadPool);
		builder.setBulkActions(-1);
		//builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); defaults to 5MB 
		builder.setConcurrentRequests(5);  //(defaults to 1) 
		//builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //defaults to not set
		builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 

		BulkProcessor bp = builder.build();
		
		return bp;
	}

but I have no idea what to put in this put()

ThreadPool threadPool = new ThreadPool(Settings.builder().put().build());

Can anyone help to point me in the right direction ?!

Hi,
using the bulk processor is a good idea.

On the settings, that you have to pass in, they only need to contain the node.name setting for historic reason, as the thread pool is also used within Elasticsearch and we have an assertion there that checks that the node.name is set as it's used as part of the names of threads that are going to be created, to better identify them later. We weren't happy with this and we changed it, see https://github.com/elastic/elasticsearch/pull/26727 . From 6.1 on the thread pool won't be required anymore, it is going to be created automatically internally.

On the errors that you mentioned before you were using the bulk processor, not sure what you mean by low-level client errors that are not propagated to the high level client. That sounds like a bug, if you have a stacktrace for that could you post mode details please?

Cheers
Luca

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.