Медленно возвращаются результаты из Elasticsearch

У меня стоит задача обновить каждый документ в индексе(их может быть около 100 миллионов). Я должен достать из индекса все эти документы, добавить какие-то новые поля и отправить обратно в эластик. Но я столкнулся с такой проблемой, что когда я использую обычный запрос match_all, с параметром size=10000, то результат возвращается очень долго. И таким образом я запускаю параллельно несколько запросов по 10000 документов и индекс, в котором на данный момент лежит 400000 документов может вычитываться около 30ти секунд, это очень долго. А учитывая то, что могут быть и будут миллионы документов, это нереально использовать. Как можно увеличить скорость запросов? Конфигурация кластера такая : 3 ноды, по 16 gb памяти на каждом

Если надо менять все документы - я бы добавил их в новый индекс и старый индекс потом удалил. Так будет быстрее.

Для большого количества документов надо использовать Scroll API.

Если изменения тривиальные и не требуют внешних источников данных, то можно написать скрипт и запустить его с помощью Update by Query API или Reindex API.

Я понимаю, что добавление в новый индекс будет быстрее, но суть проблемы в том, что всё равно нужно же получить документы из уже существующего индекса и вот именно этот процесс занимает много времени. И да, обновление требует использование внешнего источника данных, но суть проблемы в медленной отработке search запросов

Вы кусок моего ответа про Scroll API прочитали?

Да, Scroll API я тоже пробовал использовать и там результаты были ещё хуже, чем использую просто search с параметрами from и size

Что было в параметре sort, когда вы использовали scroll api?

(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)

А можно на весь цикл использования Scroll API посмотреть?

public void get(){
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")));
        SearchRequest searchRequest = new SearchRequest("myIndex");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
        searchSourceBuilder.query(queryBuilder);
        searchSourceBuilder.sort(FieldSortBuilder.DOC_FIELD_NAME);
        searchSourceBuilder.size(5000);
        searchRequest.source(searchSourceBuilder);
        final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10L));
        searchRequest.scroll(scroll);
        try {
            SearchResponse searchResponse = client .search(searchRequest, RequestOptions.DEFAULT);
            String scrollId = searchResponse.getScrollId();
            SearchHit[] allHits = new SearchHit[0];
            SearchHit[] searchHits = searchResponse.getHits().getHits();

            while(searchHits != null && searchHits.length > 0){
                allHits = (SearchHit[]) ArrayUtils.addAll(allHits, searchResponse.getHits().getHits());
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);
                searchResponse = client .searchScroll(searchScrollRequest, RequestOptions.DEFAULT);
                scrollId = searchResponse.getScrollId();
                searchHits = searchResponse.getHits().getHits();
            }

            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(scrollId);
            ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

Только что проверил ещё раз, на то, чтобы достать 200000 документов ушло около 17 секунд

3 ноды, по 16 gb памяти на каждом

Какой размер хипа у нод и какой средний размер документов?

Вы не могли бы запустить hot_threads во время выполнения скрола и прислать сюда результат?

С кодом проблем не вижу. Если у вас несколько шард в этом индексе, я бы попробовал тянуть параллельно через sliced scroll, поставив slice в количество шард.

Размер хипа у ноды 8gb, а средний размер документа 400байт.
Вот результат hot_threads:

::: {os-elasticsearch-03}{ljjVapvkQ5Ol_MMHhtkjAA}{HDMfs_wWRyqFCEmLr87Waw}{ip}{ip:9300}{dim}
   Hot threads at 2020-10-14T17:39:29.954Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:
   
   18.1% (90.2ms out of 500ms) cpu usage by thread 'elasticsearch[os-elasticsearch-03][search][T#7]'
     10/10 snapshots sharing following 20 elements
       app//org.apache.lucene.index.CodecReader.document(CodecReader.java:84)
       app//org.apache.lucene.index.FilterLeafReader.document(FilterLeafReader.java:355)
       app//org.elasticsearch.search.fetch.FetchPhase.loadStoredFields(FetchPhase.java:426)
       app//org.elasticsearch.search.fetch.FetchPhase.getSearchFields(FetchPhase.java:233)
       app//org.elasticsearch.search.fetch.FetchPhase.createSearchHit(FetchPhase.java:215)
       app//org.elasticsearch.search.fetch.FetchPhase.execute(FetchPhase.java:163)
       app//org.elasticsearch.search.SearchService.executeFetchPhase(SearchService.java:387)
       app//org.elasticsearch.search.SearchService.lambda$executeFetchPhase$4(SearchService.java:483)
       app//org.elasticsearch.search.SearchService$$Lambda$3581/0x0000000801659840.get(Unknown Source)
       app//org.elasticsearch.search.SearchService$$Lambda$3325/0x00000008015de440.get(Unknown Source)
       app//org.elasticsearch.action.ActionRunnable.lambda$supply$0(ActionRunnable.java:58)
       app//org.elasticsearch.action.ActionRunnable$$Lambda$2897/0x00000008014b4840.accept(Unknown Source)
       app//org.elasticsearch.action.ActionRunnable$2.doRun(ActionRunnable.java:73)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
       app//org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:44)
       app//org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:692)
       app//org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
       java.base@13.0.2/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
       java.base@13.0.2/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
       java.base@13.0.2/java.lang.Thread.run(Thread.java:830)

::: {os-elasticsearch-02}{lUgIgbjgT7yFEbmBY9HVVw}{dwlHqCBQROi4ckIzAK6pFg}{ip}{ip:9300}{dim}
   Hot threads at 2020-10-14T17:39:29.943Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

::: {os-elasticsearch-01}{VRPVhgeWSeKo5qJJXhDsGw}{wHwGsuWwQzyiEtILHxeFAw}{ip}{ip:9300}{dim}
   Hot threads at 2020-10-14T17:39:29.944Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

А по поводу sliced scroll, на данный момент в этом индексе только 1 шард, но это не клиентский кластер и на нём уже будет не один шард, так что можно попробовать, но я не могу найти более менее понятного примера или пояснения для реализации этого на java. Не могли бы вы помочь с этим или хотя бы +- пояснить ключевые моменты sliced scroll?

Да, похоже застряло все на чтении с диска и, возможно, пересылке. Я бы попробовал сделать слайсы на параллельных потоках. А то так у вас все а одну ноду и один диск упирается. Диск-то какой?

Работает точно так-же, только каждый запрос возвращает только результаты с одной шарды. То есть если 3 шарды то надо будет делать три поиска с new SliceBuilder(0, 3),
new SliceBuilder(1, 3), иnew SliceBuilder(2, 3) каждый поиск даст свой scroll id, который надо будет точно также обрабатывать. Можно на SearchSliceIT посмотреть. Он использует транспортный клиент и все на одном потоку, но принцип работы и интерфейс будет похожий. Только цикл for, надо распараллелить.

Честно, сейчас не могу глянуть, что за диск на девовском кластере, но попробую запустить на своём локальном кластере и сделаю 3 шарды на индекс, чтобы попробовать sliced scroll. На локали стоит винт Seagate Barracuda 7200.14 1TB (ST1000DM003). А по поводу реализации sliced scroll. Т.е. мне нужно запустить 3 потока, каждый из которых будет обрабатывать свой шард? Т.е. вот такой принцип работы в каждом потоке?

public void get(){
RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")));
        SearchRequest searchRequest = new SearchRequest("myIndex");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
        searchSourceBuilder.query(queryBuilder);
        searchSourceBuilder.sort(FieldSortBuilder.DOC_FIELD_NAME);
        searchSourceBuilder.size(5000);
        searchSourceBuilder.slice(new SliceBuilder(0, 3));
        searchRequest.source(searchSourceBuilder);
        final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10L));
        searchRequest.scroll(scroll);

        try {
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            String scrollId = searchResponse.getScrollId();
            SearchHit[] allHits = new SearchHit[0];
            SearchHit[] searchHits = searchResponse.getHits().getHits();

            while(searchHits != null && searchHits.length > 0){
                allHits = (SearchHit[]) ArrayUtils.addAll(allHits, searchResponse.getHits().getHits());
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);
                searchResponse = client.searchScroll(searchScrollRequest, RequestOptions.DEFAULT);
                scrollId = searchResponse.getScrollId();
                searchHits = searchResponse.getHits().getHits();
            }

            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(scrollId);
            ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

А разве не так получается, что просто каждый поток полностью прочитает все документы из индекса и работа будет выполнена трижды? Не нужно для каждого scrollRequest указывать from и size?

Вопрос отпал. С 3мя шардами 200000 документов читаются за 3 секунды. Я попробовал прочитать 2 миллиона документов сначала с 3мя шардами, а потом с 6тью на индексе, но увеличение количества шардов тут не привело к ускорению. С 3мя и с 6тью шардами 2 миллиона вычитываются примерно за 30 - 35 секунд. Но это наверное всё равно много для нашего проекта. И вот вопрос, тут я упираюсь в железо или можно ещё поковыряться в коде и сделать чтение ещё быстрее? Или дело только в железе и например на ssd скорость будет выше? И ещё вопрос, количество шардов лучше не ставить выше количества нод в кластере или можно? И сразу, спасибо огромное за помощь)

И вот вопрос, тут я упираюсь в железо или можно ещё поковыряться в коде и сделать чтение ещё быстрее?

Скорее всего диски все это дело замедляют. Я бы все-равно попробовал слайсы, это немного уменьшит время, которое тратиться на формирование запроса, с жесткими дисками, скорее всего разница большая не будет, но с SSD это может дополнительно помочь.

ли дело только в железе и например на ssd скорость будет выше?

SSD должны быть значительно быстрее. Еще можно с памятью поиграться. Например уменьшить хип до 3ГБ (если вы его для всяких там suggesters не используете) и дать ОС возможность использовать эту память под кэш файловой системы. Для первого чтения это будет без разницы, но на разогретой системе, это может скорость немного ускорить.

И ещё вопрос, количество шардов лучше не ставить выше количества нод в кластере или можно?

С жесткими дисками, я бы не стал, но с SSD можно было бы поэкспериментировать. На вскидку сказать не могу - надо смотреть на загрузку ЦПУ и дискового I/O. Каждая шарда - будет работать на своем потоке (до определенных пределов). Но если диск тормозит, то многочисленные потоки, будут все только тормозить особенно на HDD так как они будут за положение головки бороться. Одна шарда будет из одного места читать, другая из другого - это и так происходит даже с одной шардой (поэтому дополнительный кэш файловой системы тут очень полезен), но с большим количеством шард - это еще хуже. С SSD такой проблемы нет, так как головку двигать не надо.

Могу сказать что сделал я, чтобы ускорить поиск.

Но это относится только к одной моей инсталляции, поэтому не может являться общими требованиями.

  1. Добавил в конфиг elasticsearch.yml:
thread_pool.write.queue_size: 1000
thread_pool.search.queue_size: 500
  1. Создал темплейт для всех индексов:
PUT _template/logs
{
  "order": 1000,
  "index_patterns": [
    "logs*"
  ],
  "settings": {
    "index": {
      "mapping": {
        "total_fields": {
          "limit": "4000"
        }
      },
      "number_of_shards": "3",
      "number_of_replicas": "1",
      "refresh_interval": "30s",
      "translog": {
        "flush_threshold_size": "1024MB",
        "durability": "async"
      }
    }
  }
}

Это не ускорит поиск, это только уменьшит шансы получить ошибку 429 если elasticsearch не справляется с нагрузкой и получает слишком много запросов.

Вы, действительно, ожидаете такое большое количество полей?

Вы, действительно, ожидаете такое большое количество полей?

бывает и такое :slight_smile:

Конкретно в приведенных примерах я хотел обратить внимание на

thread_pool.search.queue_size: 500

и в темплейте на:

      "refresh_interval": "30s",
      "translog": {
        "flush_threshold_size": "1024MB",
        "durability": "async"

Вы уменьшили размер очереди в 2 раза. То есть вместо того, чтобы болтаться в очереди, если у вас больше 500 запросов ждут исполнения, все дополнительные запросы будут сразу получать код 429. Это может помочь, но в некоторых сценариях, но все зависит от того, как на клиентах настроены таймауты и как они повторяют запрос в случае получения 429. На пропускную способность это большого влияния оказать не должно. Так что вы только экономите память, которая уходит под запросы. До версии 7.4.0 это установка, действительно, имело большое значение, но после того как мы стали отменять поиск, когда клиент прерывает соединение это установка влияет на нагрузку гораздо меньше.

Это значительно уменьшит нагрузку от индексации, и косвенным образом ускорит поиск. То есть если ничего не индексируется, то на поиск это никак не будет влият. Но если у вас постоянно меняется индекс по которому вы ищите, то скорость поиска увеличится, за счет того, что вновь проиндексированные изменения могут не появляться в результатах поиска могут до 30-40 секунд и в случае потери питания, у вас могут пропасть последние записи.

1 Like