Settings in BulkProcessor (java)

Apologies this is a similar question to one I asked yesterday but I'm hoping I may have asked it in a slightly more focused way this time.

Essentially I'm trying to understand the Settings Object that is required for the ThreadPool which is required by the BulkProcessor.

I have got a (largely untested) successful first implementation of the BulkProcessor for version 6.0.0-rc2. This is rudimentary and I'm about to tidy it up & split out the Threadpool into a separate bean and so on. I've put some code below but really I'm just trying to understand the structure.

At the moment I have 3 beans - a low level rest client, a high level rest client and a bulk processor.

I create new index aliases with the low level rest client and also the high level rest client. On creating the index I pass in both a settings json file (as a string) defining tokenisers, hunspell etc and a mapping file appropriate to the index & type I'm creating. I was using the high level rest client to add documents to the index but this started failing when I added concurrency so now I use the high level rest client to create a BulkProcessor which I use to add documents to the index (this appears to be much faster than the transport client but I have ripped out the more complex data processing requirements at my end so I may be speaking too soon!). I just don't understand what settings I might need to pass in to the threadpool and passing in an empty settings object doesn't seem to be a problem (although I haven't tried to do much with the index yet other than do a basic '_search'.

    @Bean
public RestClientBuilder coreBuilder() throws Exception {
	RestClientBuilder builder = RestClient.builder(new HttpHost(EsHost, EsPort, DEFAULT_SCHEME));
	builder.setMaxRetryTimeoutMillis(MAX_RETRY_TIMEOUT_MILLIS);
	builder.setFailureListener(...)

	return builder;
}

@Bean
public RestClient getRestLowLevelClient() throws Exception {
	RestClient restClient = coreBuilder().build();
	LOGGER.info("Low level RestClient created");
	return restClient;
}

@Bean
public RestHighLevelClient getRestHighLevelClient() throws Exception {
	RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost(EsHost, EsPort, DEFAULT_SCHEME)));
	return client;
}


//index creation
String indexSettingsString = "{\"settings\":" + getResourceFile("settings.json") + "}, " 
				+ ", \"mappings\":{\"project\":" + getResourceFile(indexName.GetMappingFileName()) +"},"   
				+ "\"aliases\" : {\"alias_1\" : {}} }";

Response response = restLowLevelClient.performRequest("PUT", "/" + newIndexName, Collections.<String, String>emptyMap(), entityString);

At this point it seems to me I have defined everything that needs defining about the elasticSearch index.

As the bulk processor is created using the high level rest client whixh in turn used the low level rest client I can't think what settings I need to pass in to the the ThreadPool when creating the Bulk Processor.

ThreadPool threadPool = new ThreadPool(Settings.builder().put().build());

Am I misunderstanding the relationships between the various classes being instantiated or is it just that because my use case is building everything from scratch I'm just missing the point that someone may be using the BulkProcessor and want to pass in the settings that I am providing to the low level rest client on index creation. Or am I doing it wrong altogether ?

This is my bulk processor code but although it is mainly just copied from the documentation at the moment.

//call to BulkProcessor
List<ItemToIndex> items = List of items;

items.stream().forEach(p -> bulkProcessor.add(new IndexRequest (getIndexName(), TYPE, 
p.getId()).source(p.exportXContentBuilder())));

//config

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
	
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); 
        LOGGER.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
        System.out.println("beforeBulk");
    }
    
	    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
	    System.out.println("afterBulk");
        if (response.hasFailures()) { 
        	LOGGER.warn("Bulk [{}] executed with failures", executionId);
        } else {
        	LOGGER.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
    	System.out.println("beforeBulk with failure");
    	LOGGER.error("Failed to execute bulk", failure); 
    }
};

@Bean
public BulkProcessor bulkProcessor() throws Exception {
	//https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.0/java-rest-high-document-bulk.html
	
	ThreadPool threadPool = new ThreadPool(Settings.builder().put().build());
	
	BulkProcessor.Builder builder = new BulkProcessor.Builder(getRestHighLevelClient()::bulkAsync, listener, threadPool);
	builder.setBulkActions(-1);
	//builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); defaults to 5MB 
	builder.setConcurrentRequests(5);  //(defaults to 1) 
	//builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //defaults to not set
	builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 

	BulkProcessor bp = builder.build();
	
	return bp;
}

I just don't understand what settings I might need to pass in to the threadpool

No specific settings needed here.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.