Сравнение документов внутри индекса

Всем привет.
В индексе есть несколько типов документов. Одни документы - Input, другие - Output.
В каждом документе есть поле session_ID, которое отвечает за уникальность передачи сесии.

Вопросы:

  1. Как связать/сопоставить/посчитать документы с одинаковыми session_ID?
    Потом в идеале вывести полученный результат в Grafana...

  2. Делать сопоставление на стороне эластика или на стороне графаны?

Сколько документов в индексе? Как часто надо сопоставлять?

За день по 200 000. Сопоставление раз в минуту.

Тогда лучше хранить обе части в одном документе. То есть структура будет такая:

{
  "session": "session id goes here",
  "input": {
     "present": true,
     ....
   },
   "output": {
     "present": true,
     ....
   }
}

Так как гарантии того, что output придет после input нет, то и то и другое надо добавлять через doc_as_upsert. Сопоставление становиться тривиальным, так как все части находятся в одном документе.

Грубо говоря через Logstash сделать объединение документов?

Можно и через logstash, но через elasticsearch будет надежнее.

Прошу прощения, Игорь, за свою назойливость, но могли бы вы в нескольких шагах показать неродивому уму как сие делается? Буду очень вам признателен!

Если через logstash, то есть три метода. 1) можно вытащить документ через elasticsearch filter обновить его и проиндексировать опять. Но это вариант медленный и не надежный. 2) можно воспользоваться фильтром aggregate и формировать полный документ в logstash. Это самый быстрый вариант, но если logstash перегружается то можно потерять пары, и вы ограничены одним воркером в logstash. 3) можно индексировать как обычно, используя флаг doc_as_upsert это будет медленнее чем во втором случае и будет создавать больше нагрузки на elasticsearch, но можно при этом обрабатывать входной поток на нескольких воркерах logstash и меньше вероятности потерять события при перезагрузки logstash, особенно если использовать persistent queues.

Игорь, простите за наглость, но можно какой-нибудь "боевой пример" одного из методов? Не знаю даже с какой стороны подступиться к решению проблемы...

Странно. А вы на мои ссылки нажимали? В первом пункте документации очень подробный пример, почти совпадающий с тем, что вам нужно. В 2-ом пункте 5 конкретных и очень подробных примеров покрывающие все аспекты этой проблемы. В 3-м варианте нет примеров - но это просто флаг.

Спасибо

Игорь, здравствуйте. Не могли бы вы на примере показать как это сделать? Я так понимаю, что наиболее быстрый метод из трёх, что вы описали - это второй.

У меня есть 3 типа логов:

[#|2021-04-12T20:29:06.126+0300|INFO|testserv|somesender|_ThreadID=110;_ThreadName=thread-20;Process Instance Id=172.10.11.11:-2c25bc5:178c697e3ac:-29de;Service Assembly Name=FileReaderCA;Activity Name=InitConfig;Some Process Name=fileReader;|Flow::MainFlow::Start::SessionId=172.10.11.11:-2c25bc5:178c697e3ac:-2a8f|#]

[#|2021-04-12T20:24:19.024+0300|INFO|testserv|somesender|_ThreadID=102;_ThreadName=thread-5;Process Instance Id=172.10.11.11:-2c25bc5:178c697e3ac:-2a8c;Service Assembly Name=OutboundAdaptersCA;Activity Name=createTask;Some Process Name=Adapter;|Invoke::Task::Start::SessionId=172.10.11.11:-2c25bc5:178c697e3ac:-2a8f::Some data::pid=17749551::request=<?xml version="1.0" encoding="UTF-8"?><CreateRequest xmlns="[http://someip.ru](http://someip.ru/)"><Bic xmlns="">044525974</Bic><TaskAccount xmlns="">47422810000000000088</TaskAccount><Destination xmlns="">SysReg</Destination></CreateRequest>|#]

[#|2021-04-12T20:24:19.038+0300|INFO|testserv|somesender|_ThreadID=102;_ThreadName=thread-5;Process Instance Id=172.10.11.11:-2c25bc5:178c697e3ac:-2a8c;Service Assembly Name=OutboundAdaptersCA;Activity Name=createTask;Some Process Name=Adapter;|Invoke::Task::End::SessionId=172.10.11.11:-2c25bc5:178c697e3ac:-2a8f::Some data::RegisteredId=12383987141;errorCode=;description=|#]

Данные логи это 3 различных типов запроса - жизненный цикл прохождения одной задачи на сервере. Успешность выполнения задачи - это последовательное прохождение процесса через 3 этапа. Если один из этапов отсутствует, то выполнение задачи считается неуспешным. Мне необходимо посчитать количество таких "успешных троек" и связать их по полю SessionId.

Не могли бы вы продемонстрировать как в Logstash воспользоваться фильтром aggregate для данной задачи? Все поля уже распарсены через Grok.

Какая логика будет у фильтра? Поиск всех одинаковых значений SessionId и добавление в новое поле документа или необходимо "склеивать тройки" в 1 документ/поле и считать склеенные части?