Конфигурация Threadpool


(Evgeny Lazarev) #1

Добрый день.
Разбираюсь с возможными конфигурациями ES. Сейчас пытаюсь играться с настройками для ускорения заливки данных (hdfs -> elasticsearch-spark -> ES).
Пытался пару раз изменять настройки threadpool.bulk (увеличивал количество потоков), но данные настройки не применялись. После копания в сорсах ES, обнаружил, что эти вещи регулируются глубоко в ES и зависят от количества ядер (Runtime.getRuntime().availableProcessors()) и по-умолчанию не превышают 32.
С другой стороны, есть свойство es.processors.override, которым можно это справить...
Собственно, вопроса 2:

  • какие причины того, что было сделано именно так? Судя по докам, мануалам и гуглению, эти настройки в целом легко и часто меняются (подозреваю, что раньше оно именно так и было, но в какой то момент решили это поменять).

  • в принципе, имеет ли смысл проверять данные настройки для оптимизации заливки данных? Сейчас на своем тестовом кластере я не вижу никакой просадки ни по каким параметрам - ресурсы задействованы максимум на половину (мониторю заливку marvel`ом + nmon (linux)).


(Igor Motov) #2

Какую версию elasticsearch вы используете?
Как вы пытались менять настройки?
Как вы определили, что новые настройки не применялись?


(Evgeny Lazarev) #3

Версия 2.3.2

Менял настройки и изменением elasticsearch.yml, и через REST-API.

Проверял разными способами:

  • /_cat/thread_pool?v - в момент заливки данных bulk.active никогда не превышал 32 (при этом bulk.queue не был пустым)
  • /_nodes/thread_pool?pretty - bulk.min=bulk.max=32
  • Также отдельно проверял эти настройки на локальном ES - приконнектился debug`ом и посмотрел данные параметры при запуске ES и при разных вызовах API (типа /_cat/thread_pool)

Везде значения были отличны от тех, что указаны в elasticsearch.yml.


(Igor Motov) #4

Да, начиная с 2.1.2 elasticsearch не позволяет количество потоков в bulk и index превышать количество процессоров, поскольку это только замедляет процесс индексирования вместо его ускорения. См дискуссию тут - https://github.com/elastic/elasticsearch/issues/15582


(Evgeny Lazarev) #5

Спасибо за ссылку на issue.
Мотив понятен.

PS. Хотя сейчас у меня ровно такая же ситуация, как у последнего комментатора - кластер используется не на полную мощность :slight_smile:


(Igor Motov) #6

А что с диском твориться на нодах? Вы уверены, что это у Вас не диск тормозит?

А вы пытались воспользоваться советами в https://www.elastic.co/blog/performance-indexing-2-0 ?


(Evgeny Lazarev) #7

И Cloudera Manager, и nmon показывают, что диски используются на ~10%, в пиках не больше 25%. При этом я сталкивался с тем, что при неправильной настройке индексирования диски действительно не справлялись, и загрузка была 100%. Сейчас смотрю ночные графики работы джобы по заливке данных - скорость постоянна и равно ~510 МБ/с. При этом на кластере в рабочее время можно видеть всплески активности до 5 Гб (от других джоб).[quote="Igor_Motov, post:6, topic:51054"]
А вы пытались воспользоваться советами в https://www.elastic.co/blog/performance-indexing-2-0 ?
[/quote]

  1. Store throttling is now automatic - не трогал, но, как я понял, это и не надо теперь регулировать.
  2. Multiple path.data - вот здесь у меня есть вопросы на тему того, как правильно сделать..
    Сейчас у меня конфигурация такова, что каждая нода имеет доступ к каждому диску (9 нод, 10 дисков). Не уверен, что это до конца правильно..
  3. The Auto-ID optimization is removed - тоже ничего не делал, просто отдал это Sparkу: JavaEsSpark.saveToEs(resultRdd, ImmutableMap.of("es.resource", esResource, "es.mapping.id", esMappingId, "es.nodes", esNodes));
  4. More indexing changes... - с компрессией пока не разбирался...

В целом, за вчерашний день копания в сорсах ES я обнаружил несколько пропертей, которые могут в теории ускорить индексирование.
Буду проверять их после того, как ES закончит создавать реплики для моего тестового индекса. К сожалению, сделать копию 10 ТБ быстро не получается :slight_smile:


(Igor Motov) #8

Не понял, это как? То есть у них у всех 10 путей в path.data прописано и эти пути одинаковые? А диски как подключены?

У вас размер bulk запроса какой?

Вы не могли бы curl localhost:9200/_nodes/hot_threads?ignore_idle_threads=false&threads=10000 запустить несколько раз, сохранить результат где-нибудь и прислать сюда сылку?


(Evgeny Lazarev) #9

Да, выглядит это вот так (для каждой из нод):
path.data = ["/data1/es", "/data2/es",...]
/data1 - это путь до определенного диска, их 10 штук и они никак между собой не взаимодействуют.[quote="Igor_Motov, post:8, topic:51054"]
У вас размер bulk запроса какой?
[/quote]
Я задаю spark.es.batch.size.bytes=30mb и spark.es.batch.size.entries=15000, размер самого документа примерно 2кб.[quote="Igor_Motov, post:8, topic:51054"]
Вы не могли бы curl localhost:9200/_nodes/hot_threads?ignore_idle_threads=false&threads=10000 запустить несколько раз, сохранить результат где-нибудь и прислать сюда сылку?
[/quote]

Выложу завтра, когда закончится копирование шардов и я смогу запустить индексирование.

Вообще, я смотрел hot_threads, в топе всегда были только Lucene merge thread и Bulk.


(Igor Motov) #10

Если я вас правильно понял, то /data1/es/ на каждой ноде использует один и тот же физический диск. Поскольку elasticsearch не знает что это один и тот же диск, то он может разместить на этом диске и праймари и реплику, что может, в свою очередь, привести к полной потере данных, если этот диск накроется. Я не думаю, что такое размещение данных - хорошая идея. К тому же, учитывая как шарды распределяются между дисками, я не удивлюсь, если отдельные диски в такой конфигурации перегружаются, когда как другие диски - простаивают.


(Evgeny Lazarev) #11

Да, поняли вы правильно. Данную конфигурацию я делал на первых порах... В общем, поэтому и спрашивал..[quote="Igor_Motov, post:10, topic:51054"]
Поскольку elasticsearch не знает что это один и тот же диск, то он может разместить на этом диске и праймари и реплику, что может, в свою очередь, привести к полной потере данных, если этот диск накроется
[/quote]

Да, тут вы конечно тоже правы. С другой, все таки я собираюсь следовать заветам многих спецов и не использовать ES как primary-хранилище. Информация будет дублироваться, другой вопрос, что восстанавливать её даже внутри кластера будет проблематично: у меня сегодня упала одна дата нода (по моей вине) -> рекавери растянулось на весь день...

Вполне возможно мне просто везло... Но опять же - как я ни старался, упереться в какие то конкретные ресурсы мне пока так и не удалось.


(Evgeny Lazarev) #12


Выложил архив с 5-ю снэпшотами тредов + графиками загрузки всего кластера (не только ES), а также графики марвела.

По существу: сейчас конфигурация кластера такая, как я описывал ранее. Плюс - хранится один индекс в 10ТБ + 1 реплика (6.5 миллиардов доков).

В момент получения информации в кластер заливаются те же данные (в другой индекс).
Параметры индекса:
{"settings":{"index":{"number_of_replicas":"1","number_of_shards":"50","refresh_interval":"-1","translog":{"sync_interval":"10s","flush_threashold_ops":"100000","flush_threashold_size":"4098mb","interval":"10s"}}}}

Размер одного дока примерно 2кб


(Igor Motov) #13

Есть еще пара вопросов

  1. Сколько физической памяти на машинах и какой размер хипа на нодах?
  2. Похоже, что данные обновляются с помощью update, это сделано потому, что у вас нет доступа ко всей записи, когда вы ее обновляете?
  3. Вы используете этот кластер для поиска во время индексации?

(Evgeny Lazarev) #14

Физической памяти около 520Гб (vmstat говорит 529) на каждой ноде. Хипа выделяю 30Гб.

Доступ у меня полный - загружаю из JSONа, который сам конструировал до этого. Лежит он в hdfs, грузится Sparkом, В Spark все это передается с параметром upsert, что, как я понял, в моем случае должно заменяться на insert (так как данные пишу в пустой индекс).[quote="Igor_Motov, post:13, topic:51054"]
Вы используете этот кластер для поиска во время индексации?
[/quote]

Нет. Вообще, я планирую делать 2 конфигурации кластера - одну для заливки данных, другую для поиска.

Сейчас все сводится к тому, что надо очень много данных залить в ES. В дальнейшем по ним будет только поиск. Обновление будет редким явлением.


(Igor Motov) #15

Я спросил про память потому, что на некоторых нодах памяти явно не хватает - makea35, например. Надо разобраться, куда эта память уходит, и исправить ситуацию. Если поиск при этом не происходит, то, возможно, проблема с документами, которые вы индексируете или bulk queue слишком длинная.

Что касается дисков, то судя по низкой загрузке CPU и высоком Load Average, я, думаю, что проблема все-таки с I/O. Я бы начал с упрощения вашей дисковой организации - надо каждой ноде дать свой диск, желательно SSD и, желательно локальный, а не подсоединенный через сеть.

Во время заливки, можно еще попробовать переключиться на "index.translog.durability": "async" и увеличить sync_interval до 60 сек. Если вам отключат электричество во всем компьютерном центре, то можно потерять результат последней минуты работы, но в вашем случае это не страшно.

flush_threashold_ops можно выкинуть - все-равно в имени ошибка и оставить только flush_threashold_size.


(Evgeny Lazarev) #16

Спасибо за советы.

Все предложенные варианты попробую..


(Andrew Grigorev) #17

Покажи elasticsearch.yml?

Если диски не SSD и нет хорошего writeback-кеша - отдать каждой ноде по одному диску и поставить "index.merge.scheduler.max_thread_count: 1". Не посылает ли spark при на каждый bulk-запрос лишних /_forcemerge?

Правильно понимаю что мастер-ноды/дата-ноды не разделены? Раздели, можно наверное мастера запустить на тех же хостах раз они всё равно курят и оперативки столько лишней.

Ничего не сказано про сеть, конечно вряд ли ты про нее забыл когда смотрел во что оно может упираться, но тем не менее, что с сетью? Через какую ноду spark данные заливает?

Еще можно попробовать client-ноды на узлах где spark выполняется поднять, и лить через них, это может быть хорошей идеей (spark под es>=2.x ведь не делает их сам?).


(Andrew Grigorev) #18

Если диски за кешем с батарейкой, то еще nobarrier должен хорошо помочь. Какая FS там кстати?


(Andrew Grigorev) #19

По сабжу - bulk threadpool не трогать, выше Евгений всё верно сказал, но еще более не стоит трогать количество процессоров :slight_smile:. Поищи еще по коду где оно используется :wink:.


(Andrew Grigorev) #20

Из моего опыта, ничего хорошего из доведения ES "до идеала", чтобы на импорте в диски или процессор упирался, не получается. В общем обычно достаточно оптимизировать конфиг дисков, поправить под него merge scheduler, и развернуть отдельно мастер/дата/клиент-ноды. Ну и оперативки jvm'ке выдать 30 гигов, это правильно (btw, вариант с выдачей эластику 256Гб оперативки тоже имеет смысл в некоторых, редких, ситуациях).