Конфигурация Threadpool

discovery.zen.ping.unicast.hosts: ["makea30","makea31","makea32","makea33","makea34"]
discovery.zen.ping.multicast.enabled: false
 
bootstrap.mlockall: true
node.master: true
 
cluster.name: test-es-cluster
node.name: "makea30"
 
network.host: makea30
path.data: ["/data1/es", "/data2/es"]
 
threadpool.bulk.queue_size: 5000
threadpool.index.queue_size: 5000
 
indices.store.throttle.max_byte_per_sec: 30Gb
indices.memory.shard_inactive_time: 30m
indices.memory.index_buffer_size: 30%

Первые 5 машин - мастера. Остальные 4 - дата ноды. Конфигурация там аналогична.
Маплю каждую ноду на 2 диска (пересекаются между нодами).

Индекс создаю со следующими параметрами:
"index":{
"refresh_interval":"-1",
"number_of_shards":"50",
"number_of_replicas":"0",
"translog":{
"durability":"async",
"interval":"60s",
"sync_interval":"60s",
"flush_threshold_size":"20Gb"
},
"unassigned.node_left.delayed_timeout": "10m",
"store.stats_refresh_interval": "30s",
"merge.scheduler.max_thread_count": "1",
"merge.policy.floor_segment": "4Gb",
"merge.policy.max_merge_at_once": "15",
"merge.policy.segment_per_tier": "30",
"merge.policy.max_merged_segment": "30Gb"
}

forcemerge проверю.

Спарк льет через 30-ю ноду. Cloudera-менеджер затыки на сеть не показывает.

Про FS и диски ничего сказать не могу.

В такой конфигурации, в целом, уже достаточно хорошая скорость.
Единственное, что бросается в глаза - на некоторых нодах периодически случаются затыки в IO, балковая очередь растет (но не переполняется). Не нравится мне это тем, что когда такое происходит, остальные узлы кластера начинают простаивать.

Первые 5 машин - мастера. Остальные 4 - дата ноды

Они у тебя не выделенные мастера, а master+data? Попробуй запустить master-ноды в отдельных процессах на отдельных портах (или адресах).

Обычно 3х мастеров достаточно, 5 - хуже.

Маплю каждую ноду на 2 диска (пересекаются между нодами).

Два диска в смысле в зеркале, или в смысле просто по два отдельных тома?

edit: а, вижу что два отдельных. фиг знает, может и есть смысл, но я бы один оставил... ты тестил? с одним медленнее?

"flush_threshold_size":"20Gb"

Оно правда помогает? Вроде дефолтных 512Мб вполне достаточно должно быть, если spark по 30мб bulk'и шлет. Ну и памяти у него столько под транслог всё равно нет, я бы ограничил хотя бы в 2Гб (а лучше бы не трогал). Не понимаю как большой буффер для translog может улучшить производительность, его же всё равно писать на диск потом. Чанка в 512Мб обычно достаточно :slight_smile:. Тем более у тебя всё равно потом fsync только раз в 60 секунд - java-вский буффер просто уйдет в буффер ОС и попадет на диск только в течение минуты. Скорее всего уход в IO и лок кластера у тебя как раз из за этого.

"number_of_shards":"50",
"number_of_replicas":"0",

Извращенец :-). Но у меня самого та же фигня, правда вместо кучи шардов - куча индексов... У тебя действительно одна машина с необходимым объемом записи не справляется? А то с таким конфигом смысла в кластере не много.

Если есть возможность расшардироваться на уровне Spark (и раскидать данные по нескольким индексам, а не шардам в одном индексе) - просто запусти на каждой машине отдельный single-node elastic, прогони импорт (всё линейно замасштабируется, эластик отработает на полную), а потом уже в кластер объединяй.

да, мастер+дата

планирую потестить.

У меня данных много. При репликации в 1 это 20ТБ. ES пишет сутки, а то и больше, так как на сервере физически есть не только ES. Мне проще потом проставить реплику. Тем более, что однажды один диск стал недоступным и все вообще легло из-за рекавери. Проблема ещё и в том, что эти 10ТБ начальные - это только тестовый вариант. Реальных данных ещё больше.

Вот этот момент не совсем понятен. Точнее - действия с моей стороны ясны, но непонятно, что будет происходить при объединении всех нод в кластер.

Нет. Скорость линейно падает и в какой то момент начинают уже таски Спарка валится по таймауту.

Тут, кстати, есть один интересный вопрос (один из главных, с котором я столкнулся) - почему появляются паузы при индексировании? (на графике индексирования марвела есть паузы в ноль секунд на 10-15 примерно)

Вот этот момент не совсем понятен. Точнее - действия с моей стороны ясны, но непонятно, что будет происходить при объединении всех нод в кластер.

Нода подключается к мастеру и говорит "смотри у меня тут вот такой индекс валяется, я его подниму?". Вроде ничего плохого.

Кстати, наткнулся тут вот на такую штуку - http://www.poudro.com/blog/building-an-elasticsearch-index-offline-using-hadoop-pig/. Это еще больший изврат, но кажется может иметь смысл.

Это еще больший изврат, но кажется может иметь смысл.

На самом деле, как минимум, написав свой индексатор сможешь понять какие могут быть затыки в индексировании у ES. Не все наверное, но всё равно. Хотя конечно покурить код самого ES было бы правильнее, но это сложно, проще свой индексатор написать :slight_smile: .

Тут, кстати, есть один интересный вопрос (один из главных, с котором я столкнулся) - почему появляются паузы при индексировании? (на графике индексирования марвела есть паузы в ноль секунд на 10-15 примерно)

Попробуй замониторить как у тебя из spark'а туда данные уходят, может оно там на ответах эластика синхронизируется, и дружно уходит в формирование новых bulk'ов? Я когда-то к такому выводу пришел и забил, но не ручаюсь за корректность этого вывода.

В своем кластере вижу как в CPU уходят синхронно по-очереди то эластики, то импортеры.

Еще в тему извращений, https://github.com/MyPureCloud/elasticsearch-lambda - чуваки заморочились и на голом mapreduce развили вариант загрузки с single-node/single-index per loader (как я выше описывал) до single-shard per loader, только после создания индекса зачем-то (зачем?) отгружают его через snapshot (и как потом шарды из отдельных снэпшотов в индекс склеивать? или снэпшот один и шарды туда через черную магию запихиваюся?).

Тема также обсуждалась в гуглогруппе - https://groups.google.com/forum/#!topic/elasticsearch/xU5Faf3l5IM, но я там ничего интересного не вижу. Есть ссылка на wonderdog (ruby-шная поделка для обычного bulk load через hadoop, умеет запускать клиентскую ноду, вероятно не работает с es-2.x), и на статью про свежие версии lucene (если я правильно понимаю - в es-2.x это всё уже работает из коробки, но он всё равно не жрет 100% CPU / IO как обещает заголовок статьи, впрочем статью наверно стоило бы прочитать, но сайт лежит).

Вообще, если всерьез такие штуки обсуждать нужно для начала ответить на вопросы:

  • Какие проблемы решает Elasticsearch при массовой загрузке данных, с которыми придется столкнуться при создании индекса напрямую через lucene?

  • И наоборот, какие проблемы решать не нужно? Можно ли принципиально ускорить процесс построения индексов, если отбросить необходимость обновления этого индекса и поиска по нему в процессе заполнения?

  • Сработает ли подход "сгенерировать индекс напрямую через lucene и скормить его файлы ноде ES при запуске"? Какие подводные камни?

Я на них ответить достаточно подробно не могу :-). Так что это всё пока на пустом месте... хотя сделать и посмотреть это тоже вариант.