Elasticsearch non blocking ElasticsearchAsyncSearchAsyncClient and ElasticsearchAsyncClient

Recently I have upgraded my project to use Elasticsearch
so I have changed my client to use ElasticsearchAsyncSearchAsyncClient and ElasticsearchAsyncClient

unfortunately in both clients my response time increased hugely
I have put log in my code to check the reason, I have seen that took time from Elasticsearch did not change but time of get response from asyncClient increased
here is my code:
note: I use vertx in my project

public Future<SearchResponse<Map>> search(String routing,
            int size, Instant startTime, Instant endTime) {
        Promise<SearchResponse<Map>> responsePromise = Promise.promise();
        final var termQueryBuilder = new TermQuery.Builder();
        final var rangeQueryBuilder = new RangeQuery.Builder();
        if (startTime != null) {
            rangeQueryBuilder.field(UrlMessage.NEXT_FETCH_FIELD_NAME).gt(JsonData.of(startTime.toString()));
        }
        if (endTime != null) {
            rangeQueryBuilder.field(UrlMessage.NEXT_FETCH_FIELD_NAME).lte(JsonData.of(endTime.toString()));
        }
        String finalIndex = indexSelector.getIndex(routing);
        boolean isMainIndex = finalIndex.equals(index);
        if (isMainIndex) {
            termQueryBuilder.field(ROUTING).value(fv -> fv.stringValue(routing));
        }
        var queryParamsStr = String.format("index:%s and finalIndex:%s, routing:%s, isMainIndex:%b,  startTime:%s, " +
                        "endTime:%s",
                index, finalIndex, routing, isMainIndex, String.valueOf(startTime), String.valueOf(endTime));
        var asyncSearchRequest = SearchRequest.of(r ->
                {
                    log.info("start generate query for ");
                    r.index(finalIndex)
                            .preference(routing)
                            .sort(s -> s.field(new FieldSort.Builder().field(UrlMessage.NEXT_FETCH_FIELD_NAME)
                                    .order(SortOrder.Asc).build()))
                            .size(size);
                    if (startTime != null || endTime != null) {
                        r.query(q -> q.range(rangeQueryBuilder.build()));
                    }
                    if (isMainIndex) {
                        r.postFilter(f -> f.term(termQueryBuilder.build()));
                    }
                    return r;
                }
        );
        var queryStartTime = Instant.now();
        asyncClient.search(asyncSearchRequest, Map.class)
                .thenAccept(a -> {
                    var responseDuration = Duration.between(queryStartTime, Instant.now()).toMillis();
                    log.info("async search response for query params {} took {} ms and response duration {} ms.",
                            queryParamsStr, a.took(), responseDuration);
                    searchByTimeRangeResponseTimeSummary.get(ERROR_TAG, "ok", "none").record(responseDuration);
                    searchByTimeRangeCounter.get(ERROR_TAG, "ok", "none").increment();
                    responsePromise.complete(a);
                })
                .exceptionally(
                        e -> {
                            var responseDuration = Duration.between(queryStartTime, Instant.now()).toMillis();
                            var exceptionClassName = e.getClass().getSimpleName();
                            searchByTimeRangeResponseTimeSummary.get(ERROR_TAG, "fail", exceptionClassName).record(responseDuration);
                            searchByTimeRangeCounter.get(ERROR_TAG, "fail", exceptionClassName).increment();
                            log.error("async search response failed after {} ms for query params {} and searchRequest" +
                                    " {}", responseDuration, queryParamsStr, asyncSearchRequest, e);
                            responsePromise.fail(e);
                            return null;
                        });
        return responsePromise.future();
    }

this is my output log:

14:35:37.960 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:isna.ir, isMainIndex:true,  startTime:2022-01-22T07:58:12.716887Z, endTime:2022-01-29T07:58:12.716887Z took 18898 ms and response duration 305197 ms.
14:35:38.237 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:misaq.me, isMainIndex:true,  startTime:2022-03-17T11:39:28.277284Z, endTime:2022-04-11T10:00:38.850104Z took 8908 ms and response duration 299386 ms.
14:35:38.274 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:zil.ink, isMainIndex:true,  startTime:2022-03-21T02:51:21.025142Z, endTime:2022-04-11T10:00:38.851925Z took 8850 ms and response duration 299422 ms.
14:35:38.288 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:ninisite.com, isMainIndex:true,  startTime:2022-01-22T09:34:47.882911Z, endTime:2022-01-29T09:34:47.882911Z took 8861 ms and response duration 299435 ms.
14:35:38.358 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:isna.ir, isMainIndex:true,  startTime:2022-01-22T07:58:12.716887Z, endTime:2022-01-29T07:58:12.716887Z took 19293 ms and response duration 305594 ms.
14:35:38.552 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:weblite.me, isMainIndex:true,  startTime:2022-01-18T15:31:48.449286Z, endTime:2022-01-25T15:31:48.449286Z took 8840 ms and response duration 298696 ms.
14:35:38.558 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:zums.ac.ir, isMainIndex:true,  startTime:2022-04-11T04:41:20.407427Z, endTime:2022-04-11T10:00:39.856178Z took 8846 ms and response duration 298702 ms.
14:35:38.632 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:tbzmed.ac.ir, isMainIndex:true,  startTime:2022-02-19T08:31:30.216535Z, endTime:2022-04-11T10:00:44.760949Z took 1675 ms and response duration 293871 ms.
14:35:38.646 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:tums.ac.ir, isMainIndex:true,  startTime:null, endTime:2022-04-11T10:00:43.846905Z took 2026 ms and response duration 294799 ms.
14:35:38.799 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:sheypoor.com, isMainIndex:true,  startTime:2022-01-05T16:46:06.714578Z, endTime:2022-01-12T16:46:06.714578Z took 8950 ms and response duration 298939 ms.
14:35:38.807 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:refah.ir, isMainIndex:true,  startTime:null, endTime:2022-04-11T10:00:44.760386Z took 1850 ms and response duration 294047 ms.
14:35:38.813 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:imed.ir, isMainIndex:true,  startTime:null, endTime:2022-04-11T10:00:37.755708Z took 12398 ms and response duration 301057 ms.
14:35:38.892 [I/O dispatcher 1] INFO  IndexClient#lambda$search$33:322 - async search response for query params index:url and finalIndex:url, routing:shatel.ir, isMainIndex:true,  startTime:null, endTime:2022-04-11T10:00:43.847387Z took 2271 ms and response duration 295045 ms.

This is my search thread pool cat command:

curl -X GET "server:9200/_cat/thread_pool/search?v=true&h=id,name,active,rejected,completed&pretty"

and this is its output:

id                     name   active rejected completed
FvxD6jlCQk-3-L7i0-mp2w search      0   108381   4725144
PVOAYFI8Qd6Cd_40vC3FKQ search      0   105740   4955279
4Y-0bQiMRbuGV59J6avgEg search      2   106830   4587353
bS0phHy4QuOsIUhs6CEpCg search      2   106432   4787197
3m23_pKPSnuaXFWMhhU-PA search      2   106943   4815429
thSp4pDqSDuqoTWXn8mAlQ search      0   107789   4401516
GmLlKhoWQvuzhMfPP6SB3g search      0   110134   7184929

I think there will be some concurrency issue

FInally we have solved the problem by using vertx webClient:

var mustQuery = new JsonObject().put("must", mustJsonArray);
        var requestBody = new JsonObject()
                .put("query", new JsonObject().put("bool", mustQuery))
                .put("sort", new JsonArray(List.of(
                                new JsonObject().put("nextFetch",
                                new JsonObject().put("order", "asc")))));
        webClient.getAbs(queryString)
                .sendJsonObject(requestBody)
                .onFailure(responsePromise::fail)
                .onSuccess(s -> {
                    var searchResponse = new JsonObject(s.bodyAsString()).mapTo(ElasticResponseMessage.class);
                    log.info(searchResponse);
                    responsePromise.complete(searchResponse);
                });

I think there is a mistake in my implementation. I think I made a mistake using Elasticsearch non blocking client

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.