Конфигурация Threadpool

Добрый день.
Разбираюсь с возможными конфигурациями ES. Сейчас пытаюсь играться с настройками для ускорения заливки данных (hdfs -> elasticsearch-spark -> ES).
Пытался пару раз изменять настройки threadpool.bulk (увеличивал количество потоков), но данные настройки не применялись. После копания в сорсах ES, обнаружил, что эти вещи регулируются глубоко в ES и зависят от количества ядер (Runtime.getRuntime().availableProcessors()) и по-умолчанию не превышают 32.
С другой стороны, есть свойство es.processors.override, которым можно это справить...
Собственно, вопроса 2:

  • какие причины того, что было сделано именно так? Судя по докам, мануалам и гуглению, эти настройки в целом легко и часто меняются (подозреваю, что раньше оно именно так и было, но в какой то момент решили это поменять).

  • в принципе, имеет ли смысл проверять данные настройки для оптимизации заливки данных? Сейчас на своем тестовом кластере я не вижу никакой просадки ни по каким параметрам - ресурсы задействованы максимум на половину (мониторю заливку marvel`ом + nmon (linux)).

Какую версию elasticsearch вы используете?
Как вы пытались менять настройки?
Как вы определили, что новые настройки не применялись?

Версия 2.3.2

Менял настройки и изменением elasticsearch.yml, и через REST-API.

Проверял разными способами:

  • /_cat/thread_pool?v - в момент заливки данных bulk.active никогда не превышал 32 (при этом bulk.queue не был пустым)
  • /_nodes/thread_pool?pretty - bulk.min=bulk.max=32
  • Также отдельно проверял эти настройки на локальном ES - приконнектился debug`ом и посмотрел данные параметры при запуске ES и при разных вызовах API (типа /_cat/thread_pool)

Везде значения были отличны от тех, что указаны в elasticsearch.yml.

Да, начиная с 2.1.2 elasticsearch не позволяет количество потоков в bulk и index превышать количество процессоров, поскольку это только замедляет процесс индексирования вместо его ускорения. См дискуссию тут - https://github.com/elastic/elasticsearch/issues/15582

Спасибо за ссылку на issue.
Мотив понятен.

PS. Хотя сейчас у меня ровно такая же ситуация, как у последнего комментатора - кластер используется не на полную мощность :slight_smile:

А что с диском твориться на нодах? Вы уверены, что это у Вас не диск тормозит?

А вы пытались воспользоваться советами в Performance Considerations for Elasticsearch 2.0 Indexing | Elastic Blog ?

И Cloudera Manager, и nmon показывают, что диски используются на ~10%, в пиках не больше 25%. При этом я сталкивался с тем, что при неправильной настройке индексирования диски действительно не справлялись, и загрузка была 100%. Сейчас смотрю ночные графики работы джобы по заливке данных - скорость постоянна и равно ~510 МБ/с. При этом на кластере в рабочее время можно видеть всплески активности до 5 Гб (от других джоб).[quote="Igor_Motov, post:6, topic:51054"]
А вы пытались воспользоваться советами в Performance Considerations for Elasticsearch 2.0 Indexing | Elastic Blog ?
[/quote]

  1. Store throttling is now automatic - не трогал, но, как я понял, это и не надо теперь регулировать.
  2. Multiple path.data - вот здесь у меня есть вопросы на тему того, как правильно сделать..
    Сейчас у меня конфигурация такова, что каждая нода имеет доступ к каждому диску (9 нод, 10 дисков). Не уверен, что это до конца правильно..
  3. The Auto-ID optimization is removed - тоже ничего не делал, просто отдал это Sparkу: JavaEsSpark.saveToEs(resultRdd, ImmutableMap.of("es.resource", esResource, "es.mapping.id", esMappingId, "es.nodes", esNodes));
  4. More indexing changes... - с компрессией пока не разбирался...

В целом, за вчерашний день копания в сорсах ES я обнаружил несколько пропертей, которые могут в теории ускорить индексирование.
Буду проверять их после того, как ES закончит создавать реплики для моего тестового индекса. К сожалению, сделать копию 10 ТБ быстро не получается :slight_smile:

Не понял, это как? То есть у них у всех 10 путей в path.data прописано и эти пути одинаковые? А диски как подключены?

У вас размер bulk запроса какой?

Вы не могли бы curl localhost:9200/_nodes/hot_threads?ignore_idle_threads=false&threads=10000 запустить несколько раз, сохранить результат где-нибудь и прислать сюда сылку?

Да, выглядит это вот так (для каждой из нод):
path.data = ["/data1/es", "/data2/es",...]
/data1 - это путь до определенного диска, их 10 штук и они никак между собой не взаимодействуют.[quote="Igor_Motov, post:8, topic:51054"]
У вас размер bulk запроса какой?
[/quote]
Я задаю spark.es.batch.size.bytes=30mb и spark.es.batch.size.entries=15000, размер самого документа примерно 2кб.[quote="Igor_Motov, post:8, topic:51054"]
Вы не могли бы curl localhost:9200/_nodes/hot_threads?ignore_idle_threads=false&threads=10000 запустить несколько раз, сохранить результат где-нибудь и прислать сюда сылку?
[/quote]

Выложу завтра, когда закончится копирование шардов и я смогу запустить индексирование.

Вообще, я смотрел hot_threads, в топе всегда были только Lucene merge thread и Bulk.

Если я вас правильно понял, то /data1/es/ на каждой ноде использует один и тот же физический диск. Поскольку elasticsearch не знает что это один и тот же диск, то он может разместить на этом диске и праймари и реплику, что может, в свою очередь, привести к полной потере данных, если этот диск накроется. Я не думаю, что такое размещение данных - хорошая идея. К тому же, учитывая как шарды распределяются между дисками, я не удивлюсь, если отдельные диски в такой конфигурации перегружаются, когда как другие диски - простаивают.

Да, поняли вы правильно. Данную конфигурацию я делал на первых порах... В общем, поэтому и спрашивал..[quote="Igor_Motov, post:10, topic:51054"]
Поскольку elasticsearch не знает что это один и тот же диск, то он может разместить на этом диске и праймари и реплику, что может, в свою очередь, привести к полной потере данных, если этот диск накроется
[/quote]

Да, тут вы конечно тоже правы. С другой, все таки я собираюсь следовать заветам многих спецов и не использовать ES как primary-хранилище. Информация будет дублироваться, другой вопрос, что восстанавливать её даже внутри кластера будет проблематично: у меня сегодня упала одна дата нода (по моей вине) -> рекавери растянулось на весь день...

Вполне возможно мне просто везло... Но опять же - как я ни старался, упереться в какие то конкретные ресурсы мне пока так и не удалось.


Выложил архив с 5-ю снэпшотами тредов + графиками загрузки всего кластера (не только ES), а также графики марвела.

По существу: сейчас конфигурация кластера такая, как я описывал ранее. Плюс - хранится один индекс в 10ТБ + 1 реплика (6.5 миллиардов доков).

В момент получения информации в кластер заливаются те же данные (в другой индекс).
Параметры индекса:
{"settings":{"index":{"number_of_replicas":"1","number_of_shards":"50","refresh_interval":"-1","translog":{"sync_interval":"10s","flush_threashold_ops":"100000","flush_threashold_size":"4098mb","interval":"10s"}}}}

Размер одного дока примерно 2кб

Есть еще пара вопросов

  1. Сколько физической памяти на машинах и какой размер хипа на нодах?
  2. Похоже, что данные обновляются с помощью update, это сделано потому, что у вас нет доступа ко всей записи, когда вы ее обновляете?
  3. Вы используете этот кластер для поиска во время индексации?

Физической памяти около 520Гб (vmstat говорит 529) на каждой ноде. Хипа выделяю 30Гб.

Доступ у меня полный - загружаю из JSONа, который сам конструировал до этого. Лежит он в hdfs, грузится Sparkом, В Spark все это передается с параметром upsert, что, как я понял, в моем случае должно заменяться на insert (так как данные пишу в пустой индекс).[quote="Igor_Motov, post:13, topic:51054"]
Вы используете этот кластер для поиска во время индексации?
[/quote]

Нет. Вообще, я планирую делать 2 конфигурации кластера - одну для заливки данных, другую для поиска.

Сейчас все сводится к тому, что надо очень много данных залить в ES. В дальнейшем по ним будет только поиск. Обновление будет редким явлением.

Я спросил про память потому, что на некоторых нодах памяти явно не хватает - makea35, например. Надо разобраться, куда эта память уходит, и исправить ситуацию. Если поиск при этом не происходит, то, возможно, проблема с документами, которые вы индексируете или bulk queue слишком длинная.

Что касается дисков, то судя по низкой загрузке CPU и высоком Load Average, я, думаю, что проблема все-таки с I/O. Я бы начал с упрощения вашей дисковой организации - надо каждой ноде дать свой диск, желательно SSD и, желательно локальный, а не подсоединенный через сеть.

Во время заливки, можно еще попробовать переключиться на "index.translog.durability": "async" и увеличить sync_interval до 60 сек. Если вам отключат электричество во всем компьютерном центре, то можно потерять результат последней минуты работы, но в вашем случае это не страшно.

flush_threashold_ops можно выкинуть - все-равно в имени ошибка и оставить только flush_threashold_size.

Спасибо за советы.

Все предложенные варианты попробую..

Покажи elasticsearch.yml?

Если диски не SSD и нет хорошего writeback-кеша - отдать каждой ноде по одному диску и поставить "index.merge.scheduler.max_thread_count: 1". Не посылает ли spark при на каждый bulk-запрос лишних /_forcemerge?

Правильно понимаю что мастер-ноды/дата-ноды не разделены? Раздели, можно наверное мастера запустить на тех же хостах раз они всё равно курят и оперативки столько лишней.

Ничего не сказано про сеть, конечно вряд ли ты про нее забыл когда смотрел во что оно может упираться, но тем не менее, что с сетью? Через какую ноду spark данные заливает?

Еще можно попробовать client-ноды на узлах где spark выполняется поднять, и лить через них, это может быть хорошей идеей (spark под es>=2.x ведь не делает их сам?).

Если диски за кешем с батарейкой, то еще nobarrier должен хорошо помочь. Какая FS там кстати?

По сабжу - bulk threadpool не трогать, выше Евгений всё верно сказал, но еще более не стоит трогать количество процессоров :slight_smile:. Поищи еще по коду где оно используется :wink:.

Из моего опыта, ничего хорошего из доведения ES "до идеала", чтобы на импорте в диски или процессор упирался, не получается. В общем обычно достаточно оптимизировать конфиг дисков, поправить под него merge scheduler, и развернуть отдельно мастер/дата/клиент-ноды. Ну и оперативки jvm'ке выдать 30 гигов, это правильно (btw, вариант с выдачей эластику 256Гб оперативки тоже имеет смысл в некоторых, редких, ситуациях).