Rest High Level client - Gracefully flushing bulk processor as JVM exits

Are there any recommendations on how to cleanly flush all items that are in the Rest High Level bulkprocessor (configured with one concurrent request) as the JVM exits? We have a spring boot application and have tried to manually flush the bulkprocessor as the bean is destroyed and also the awaitClose, but in both cases the bulkprocessor does not seem to be flushing the items out to ES and the bulk items are lost.

This is using Rest High level client 7.5.1 and Elastic Search version 7.4.0

Many thanks

Hey,

closing the bulkprocessor should flush out remaining items, see https://github.com/elastic/elasticsearch/blob/7.5/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java#L329-L331

However please take a look at the javadocs, if concurrent requests are enabled, you need to specify the amount of time you want to wait. Also, what is the return value of awaitClose() when you call it?

Maye you can also mention how the bulk processor is configured.

Thanks for the reply. We currently have the bulkprocessor configured as follows:

BulkProcessor.builder(consumer, listenerr)
.setBulkActions(1000)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
When we explicitly call awaitClose it returns immediately in the dispose method and we'll try and get the return value for more information. One thing we did notice is that the afterBulk method Throwable contains the Interrupted Exception (which I am guessing is expected as the JVM shuts down).

Thanks

please check the return value of awaitClose - is it true or false? By default it does not wait as mentioned in the javadocs - that was the reason why I explicitely asked if you do check it.

Sorry for the delay - I have now checked the return value and it is returning true (this is with an explicit timeout specified).

Thanks

can you share the code of doing that, so it is easier to follow?

Finally figured it out. In Spring Boot you need to override the taskScheduler Bean to waitForTasksTocCompleteOnShutdown. Something like this:

    @Bean
    public ThreadPoolTaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(5);
        threadPoolTaskScheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
        threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskScheduler.setAwaitTerminationSeconds(2);
        return threadPoolTaskScheduler;
    }

This is because the default taskScheduler will interrupt the executing thread and not allow the bulkProcessor to flush items.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.