I am have written the very basic first steps of a java app to build 6 indexes. All works okay with somethign along these lines -
`highLevelRestClient.bulkAsync(request, bulkActionListener);`
until I add concurrency and then I get mainly successful writes but with a lot of java.util.concurrent.TimeoutException exceptions.
The error occurs in the low level client without registering as a failure in the high level client. I assume this is the correct behaviour but as far as I can see this makes it difficult to know which items failed and to resend them to the client (all actions at the moment are adding new documents to the index (other than the initial create index and type calls))?
I then thought after some looking around that I should use the BulkProcessor as it seems on retrospect that this is the best way to handle concurrency (?) and also it seems to listen at the level of the high level rest client for errors as well as success and failures. I've struggled with this a little and moved from 5.6.3 to 6.0.0-rc2, The code now builds but in this version I need to instantiate an org.elasticsearch.threadpool ThreadPool object which takes a org.elasticsearch.common.settings Setting object in the constructor. It may be google fatigue but I just can't seem to find any documentation about what needs to go into this settings object.
I've found an example here
threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "high-level-client").build());
and understand it is basically a map but I can't quite get my head around whether they are threadpool settings - size etc, or cluster/index/node settings and in either case what I would or wouldn't have to set to get it working in the first instance ?
So to be more open this is the code I'nm putting in my config file (at an early pre smart stage)
@Bean
public BulkProcessor bulkProcessor() throws Exception {
//https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.0/java-rest-high-document-bulk.html
ThreadPool threadPool = new ThreadPool(Settings.builder().put().build());
BulkProcessor.Builder builder = new BulkProcessor.Builder(getRestHighLevelClient()::bulkAsync, listener, threadPool);
builder.setBulkActions(-1);
//builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); defaults to 5MB
builder.setConcurrentRequests(5); //(defaults to 1)
//builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //defaults to not set
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
BulkProcessor bp = builder.build();
return bp;
}
but I have no idea what to put in this put()
ThreadPool threadPool = new ThreadPool(Settings.builder().put().build());
Can anyone help to point me in the right direction ?!