Добрый день.
Разбираюсь с возможными конфигурациями ES. Сейчас пытаюсь играться с настройками для ускорения заливки данных (hdfs -> elasticsearch-spark -> ES).
Пытался пару раз изменять настройки threadpool.bulk (увеличивал количество потоков), но данные настройки не применялись. После копания в сорсах ES, обнаружил, что эти вещи регулируются глубоко в ES и зависят от количества ядер (Runtime.getRuntime().availableProcessors()) и по-умолчанию не превышают 32.
С другой стороны, есть свойство es.processors.override, которым можно это справить...
Собственно, вопроса 2:
какие причины того, что было сделано именно так? Судя по докам, мануалам и гуглению, эти настройки в целом легко и часто меняются (подозреваю, что раньше оно именно так и было, но в какой то момент решили это поменять).
в принципе, имеет ли смысл проверять данные настройки для оптимизации заливки данных? Сейчас на своем тестовом кластере я не вижу никакой просадки ни по каким параметрам - ресурсы задействованы максимум на половину (мониторю заливку marvel`ом + nmon (linux)).
Менял настройки и изменением elasticsearch.yml, и через REST-API.
Проверял разными способами:
/_cat/thread_pool?v - в момент заливки данных bulk.active никогда не превышал 32 (при этом bulk.queue не был пустым)
/_nodes/thread_pool?pretty - bulk.min=bulk.max=32
Также отдельно проверял эти настройки на локальном ES - приконнектился debug`ом и посмотрел данные параметры при запуске ES и при разных вызовах API (типа /_cat/thread_pool)
Везде значения были отличны от тех, что указаны в elasticsearch.yml.
Да, начиная с 2.1.2 elasticsearch не позволяет количество потоков в bulk и index превышать количество процессоров, поскольку это только замедляет процесс индексирования вместо его ускорения. См дискуссию тут - https://github.com/elastic/elasticsearch/issues/15582
И Cloudera Manager, и nmon показывают, что диски используются на ~10%, в пиках не больше 25%. При этом я сталкивался с тем, что при неправильной настройке индексирования диски действительно не справлялись, и загрузка была 100%. Сейчас смотрю ночные графики работы джобы по заливке данных - скорость постоянна и равно ~510 МБ/с. При этом на кластере в рабочее время можно видеть всплески активности до 5 Гб (от других джоб).[quote="Igor_Motov, post:6, topic:51054"]
А вы пытались воспользоваться советами в Performance Considerations for Elasticsearch 2.0 Indexing | Elastic Blog ?
[/quote]
Store throttling is now automatic - не трогал, но, как я понял, это и не надо теперь регулировать.
Multiple path.data - вот здесь у меня есть вопросы на тему того, как правильно сделать..
Сейчас у меня конфигурация такова, что каждая нода имеет доступ к каждому диску (9 нод, 10 дисков). Не уверен, что это до конца правильно..
The Auto-ID optimization is removed - тоже ничего не делал, просто отдал это Sparkу: JavaEsSpark.saveToEs(resultRdd, ImmutableMap.of("es.resource", esResource, "es.mapping.id", esMappingId, "es.nodes", esNodes));
More indexing changes... - с компрессией пока не разбирался...
В целом, за вчерашний день копания в сорсах ES я обнаружил несколько пропертей, которые могут в теории ускорить индексирование.
Буду проверять их после того, как ES закончит создавать реплики для моего тестового индекса. К сожалению, сделать копию 10 ТБ быстро не получается
Не понял, это как? То есть у них у всех 10 путей в path.data прописано и эти пути одинаковые? А диски как подключены?
У вас размер bulk запроса какой?
Вы не могли бы curl localhost:9200/_nodes/hot_threads?ignore_idle_threads=false&threads=10000 запустить несколько раз, сохранить результат где-нибудь и прислать сюда сылку?
Да, выглядит это вот так (для каждой из нод):
path.data = ["/data1/es", "/data2/es",...]
/data1 - это путь до определенного диска, их 10 штук и они никак между собой не взаимодействуют.[quote="Igor_Motov, post:8, topic:51054"]
У вас размер bulk запроса какой?
[/quote]
Я задаю spark.es.batch.size.bytes=30mb и spark.es.batch.size.entries=15000, размер самого документа примерно 2кб.[quote="Igor_Motov, post:8, topic:51054"]
Вы не могли бы curl localhost:9200/_nodes/hot_threads?ignore_idle_threads=false&threads=10000 запустить несколько раз, сохранить результат где-нибудь и прислать сюда сылку?
[/quote]
Выложу завтра, когда закончится копирование шардов и я смогу запустить индексирование.
Вообще, я смотрел hot_threads, в топе всегда были только Lucene merge thread и Bulk.
Если я вас правильно понял, то /data1/es/ на каждой ноде использует один и тот же физический диск. Поскольку elasticsearch не знает что это один и тот же диск, то он может разместить на этом диске и праймари и реплику, что может, в свою очередь, привести к полной потере данных, если этот диск накроется. Я не думаю, что такое размещение данных - хорошая идея. К тому же, учитывая как шарды распределяются между дисками, я не удивлюсь, если отдельные диски в такой конфигурации перегружаются, когда как другие диски - простаивают.
Да, поняли вы правильно. Данную конфигурацию я делал на первых порах... В общем, поэтому и спрашивал..[quote="Igor_Motov, post:10, topic:51054"]
Поскольку elasticsearch не знает что это один и тот же диск, то он может разместить на этом диске и праймари и реплику, что может, в свою очередь, привести к полной потере данных, если этот диск накроется
[/quote]
Да, тут вы конечно тоже правы. С другой, все таки я собираюсь следовать заветам многих спецов и не использовать ES как primary-хранилище. Информация будет дублироваться, другой вопрос, что восстанавливать её даже внутри кластера будет проблематично: у меня сегодня упала одна дата нода (по моей вине) -> рекавери растянулось на весь день...
Вполне возможно мне просто везло... Но опять же - как я ни старался, упереться в какие то конкретные ресурсы мне пока так и не удалось.
Выложил архив с 5-ю снэпшотами тредов + графиками загрузки всего кластера (не только ES), а также графики марвела.
По существу: сейчас конфигурация кластера такая, как я описывал ранее. Плюс - хранится один индекс в 10ТБ + 1 реплика (6.5 миллиардов доков).
В момент получения информации в кластер заливаются те же данные (в другой индекс).
Параметры индекса: {"settings":{"index":{"number_of_replicas":"1","number_of_shards":"50","refresh_interval":"-1","translog":{"sync_interval":"10s","flush_threashold_ops":"100000","flush_threashold_size":"4098mb","interval":"10s"}}}}
Физической памяти около 520Гб (vmstat говорит 529) на каждой ноде. Хипа выделяю 30Гб.
Доступ у меня полный - загружаю из JSONа, который сам конструировал до этого. Лежит он в hdfs, грузится Sparkом, В Spark все это передается с параметром upsert, что, как я понял, в моем случае должно заменяться на insert (так как данные пишу в пустой индекс).[quote="Igor_Motov, post:13, topic:51054"]
Вы используете этот кластер для поиска во время индексации?
[/quote]
Нет. Вообще, я планирую делать 2 конфигурации кластера - одну для заливки данных, другую для поиска.
Сейчас все сводится к тому, что надо очень много данных залить в ES. В дальнейшем по ним будет только поиск. Обновление будет редким явлением.
Я спросил про память потому, что на некоторых нодах памяти явно не хватает - makea35, например. Надо разобраться, куда эта память уходит, и исправить ситуацию. Если поиск при этом не происходит, то, возможно, проблема с документами, которые вы индексируете или bulk queue слишком длинная.
Что касается дисков, то судя по низкой загрузке CPU и высоком Load Average, я, думаю, что проблема все-таки с I/O. Я бы начал с упрощения вашей дисковой организации - надо каждой ноде дать свой диск, желательно SSD и, желательно локальный, а не подсоединенный через сеть.
Во время заливки, можно еще попробовать переключиться на "index.translog.durability": "async" и увеличить sync_interval до 60 сек. Если вам отключат электричество во всем компьютерном центре, то можно потерять результат последней минуты работы, но в вашем случае это не страшно.
flush_threashold_ops можно выкинуть - все-равно в имени ошибка и оставить только flush_threashold_size.
Если диски не SSD и нет хорошего writeback-кеша - отдать каждой ноде по одному диску и поставить "index.merge.scheduler.max_thread_count: 1". Не посылает ли spark при на каждый bulk-запрос лишних /_forcemerge?
Правильно понимаю что мастер-ноды/дата-ноды не разделены? Раздели, можно наверное мастера запустить на тех же хостах раз они всё равно курят и оперативки столько лишней.
Ничего не сказано про сеть, конечно вряд ли ты про нее забыл когда смотрел во что оно может упираться, но тем не менее, что с сетью? Через какую ноду spark данные заливает?
Еще можно попробовать client-ноды на узлах где spark выполняется поднять, и лить через них, это может быть хорошей идеей (spark под es>=2.x ведь не делает их сам?).
По сабжу - bulk threadpool не трогать, выше Евгений всё верно сказал, но еще более не стоит трогать количество процессоров . Поищи еще по коду где оно используется .
Из моего опыта, ничего хорошего из доведения ES "до идеала", чтобы на импорте в диски или процессор упирался, не получается. В общем обычно достаточно оптимизировать конфиг дисков, поправить под него merge scheduler, и развернуть отдельно мастер/дата/клиент-ноды. Ну и оперативки jvm'ке выдать 30 гигов, это правильно (btw, вариант с выдачей эластику 256Гб оперативки тоже имеет смысл в некоторых, редких, ситуациях).
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.