ES 7.8.0 Java RestHighLevelClient UpdateByQueryAsync throws SocketTimeoutException

Hi all,

First post here so I should say, we love elasticsearch! Some big fans on our team.

I'm looking into an issue we're seeing on ES 7.8.0 using the Java RestHighLevelClient, where an expensive UpdateByQuery is throwing a SocketTimeoutException early, before it completes. I'm not sure if we're doing something wrong in how we're calling it.

Here's some cut- down code (with some bits chopped out and moved around for clarity - the real thing happens via several methods in some service classes), and then some notes on what I've tried and the error we see.

RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, connectionType))
def client = new RestHighLevelClient(builder.setRequestConfigCallback(
    new RestClientBuilder.RequestConfigCallback() {
        @Override
        RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
            return requestConfigBuilder.setSocketTimeout(5000)
        }
    }
))

def updateScript = """... ( add or remove a value in a field ) ..."""

UpdateByQueryRequest request = new UpdateByQueryRequest('foo')

request.setScript(script)
request.setQuery(...)
request.setSlices(0)

BulkByScrollResponse bulkResponse
Exception exception

long maxTimeoutInMilliSeconds = 300000

// Create a listener that is attached to the async bulkRequest that will set the bulkResponse ( or exception )
// when the request has been finalised.
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
    // On Response, set the response to the variable we initialised earlier.
    @Override
    void onResponse(BulkByScrollResponse response) {
        bulkResponse = response
    }
    // onFailure, set the exception to the variable we initialised earlier.
    @Override
    void onFailure(Exception e) {
        exception = e
    }
}

// Create a CountDownLatch which is used in conjunction with the LatchedActionListener to wait for the
// async bulkResponse request to finish.
final CountDownLatch latch = new CountDownLatch(1)
// Cast the listener to a LatchedActionListener which can take the CountDownLatch.
listener = new LatchedActionListener<>(listener, latch)

client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener)

// Tell the LatchedActionListener to wait for the request to complete.
if (latch.await(maxTimeoutInMilliSeconds, TimeUnit.MILLISECONDS) == false ){
    println ("Maximum timeout [${maxTimeoutInMilliSeconds}ms] reached for updateByQuery.")
    throw new TimeoutException("Maximum timeout was reached for updateByQuery")
}

// if the request resulted in a onFailure event, then log the exception.
if (exception) {
    println ("Error executing updateByQuery")
    println exception
    throw exception
}

Execute with a query that takes a little while - say a few minutes.

The output we get is:

Error executing updateByQuery

java.net.SocketTimeoutException: 5,000 milliseconds timeout on connection http-outgoing-58 [ACTIVE]

I would have expected this to complete successfully, and if our query took more than 300 seconds to print the "Maximum timeout ..." message from the failed latch.await.

I've tried a few things here:

  • Setting batchSize to small numbers on the request; thinking that maybe ES was busy working on one big batch and wasn't able to respond. Even with this set to 1, I get the socket exception.

  • I don't understand why we're using the async call here as we appear to be treating it synchronously anyway, but when I look at the file history we changed this from synchronous to async to avoid the socket timeout error. I've tested going back to the sync call and we hit the same socket exception we see now - but if we can remove the added complexity of the async call here that would be nice, I think.

Also, worth noting:

  • This pattern of using an UpdateByQueryRequest and a LatchedActionListener is not what I see in the high level rest client docs, only in the low level client in a section for multiple parallel async requests

  • I can see tasks spawned in the ES task lists for this query which run after the exception is thrown and seem to update all the docs. I was wondering if we should be getting the task ID somehow and polling on that? But I can't see how to get it for an updateByQuery or updateByQueryAsync call.

...

Is this expected behaviour? A bug? Are we doing something wrong?

Should we be setting our socketTimeout high enough to cope with our longest running requests? This seems a bit messy - I'd prefer a way for long running requests to keep going unless there was a problem.

Should we be polling / waiting on the ES task / request state in a different way?

Any insights will be gratefully appreciated!

I see this topic - Socket timeout during reindexAsync in RestHighLevelClient - and it sounds like a similar situation - but I can't see any submitUpdateByQueryTask on the RestHighLevelClient. Would this be the right way to approach it? Can I use the low level client to do this? I'll take a look into this approach...

I think https://github.com/elastic/elasticsearch/pull/58552 is what we're waiting on to add support for the task-based variant of updateByQuery to the RestHighLevelClient.

IIUC, the async calls in the RestHighLevelClient will timeout with socket exceptions by design if they take longer than the socket timeout, and a task is the correct way to run a long-running job.

Using the low level client is another option, I think, but I need to investigate what that would look like and the impact on our codebase.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.