Duration Time между двумя разными Events

Коллеги, привет!
Помогите, пожалуйста, с решением.

Я рассчитываю Duration Time между двумя разными Events в эластике.
Использую следующий метод:

if "COMMONROUTER.OUT.REMOTE" in [grok_reciever] {
elasticsearch {
    hosts => ["data_cluster_elk:9200"]
    ca_file => 'tmp/certs/ca.crt'
    user => 'elastic'
    password => 'secret'
    index => "trans_log-*"
    query => 'grok_sender:"COMMONROUTER.IN" AND grok_procid:%{grok_procid}'
    fields => { "@timestamp" => "timestamp_start" }
    add_tag => [ok]
   }
  }

На выходе получаю следующее: 50% закрывающих events (COMMONROUTER.OUT.REMOTE) имеют доп.поле время старта, но другие 50% evaents строки старт не имеют, при анализе выявил, что IN и OUT "влетают" в ELK в одно и тоже время (данных очень много и лаг между IN\OUT может быть менее секунды).

В связи с этим вопрос: Возможно ли проапгрейдить мой конфиг в LS, чтобы на выходе все 100% закрывающих events имели поля старта транзакции?

Если нет, подскажите, как проделать эту же самую операцию через API ElK. То есть, скажем я раз в 10 секунд обращаюсь к API ELK и ищу закрывающий event, добавляю в него данные о времени из открывающего event и рассчитываю Duration.

А вы можете угадать id открывающего документа по закрывающему? Если да - то лучше всего вместо поиска второго документа сделать update первому.

Игорь,
Все events в рамках скажем одной транзакции имеют одинаковый grok_procid.
То есть мне необходимо добавить в блок COMMONROUTER.IN поле времени из блока COMMONROUTER.OUT.REMOTE.

Теперь главный вопрос, есть ли примеры запросов, как реализовать данную задачу?

Полных примеров нет, но можно что-то состряпать из Logstash configuration: how to call the partial-update API from ElasticSearch output plugin? - Stack Overflow с action=update и вычислением времени в script.

Коллеги, еще раз всем привет.
Помогите решить проблему:

На входе поступают данные в большом количестве, следующего вида:

"_index": "mq",
  "_type": "_doc",
  "_id": "yQ",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2019-10-14T10:43:10.793Z",
    "@version": "1",
    "headers": {
      "http_host": "elk_backend",
      "request_path": "/",
      "content_length": "90533",
      "content_type": "application/json; charset=UTF-8",
      "http_accept": null,
      "accept_encoding": "gzip,deflate",
      "http_version": "HTTP/1.1",
      "request_method": "POST",
      "http_user_agent": "Apache-HttpClient/4.5.8 (Java/1.8.0_121)"
    },
    "loggingMessages": [
      {
        "description": "Start",
        "starttime": "2019",
        "eventtype": "32",
        "msgtype": "otb",
        "mqmdmsgid": null,
        "msgid": "6a1",
        "messagestatusid": 0,
        "reciever": "",
        "mqmdcorrelid": null,
        "procid": null,
        "sender": "Test-1",
        "exception": null
      },
      {
        "description": "to",
        "starttime": "2019",
        "eventtype": "34",
        "msgtype": "Rs",
        "mqmdmsgid": "414",
        "msgid": "219",
        "messagestatusid": 0,
        "reciever": "test2a",
        "mqmdcorrelid": "000",
        "procid": "ebb",
        "sender": "Test2",
        "exception": null
      },

Как видно, поле loggingMessages имеет в своей структуре данные типа json, таких блоков json в поле loggingMessages достигает 100 возможно 300 блоков.

Когда не было высокой нагрузки эту задачу я решал через LS со следующим конфигом:

filter {
 split {
 field => "[loggingMessages]" }

Как только дали нагрузку со стороны источника данных, решено было перенести обработку на Iingest Node:

Со следующим конфигом:

 PUT _ingest/pipeline/mq
{
"description" : "mq",
"processors" : [ 
{
"split": {
"field": "loggingMessages",
"separator": "\n" 
}
}
]
}

В итоге получил следующую ошибку:

[logstash.outputs.elasticsearch] retrying failed action with response code: 500 ({"type"=>"exception", "reason"=>"java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [loggingMessages] of type [java.util.ArrayList] cannot be cast to [java.lang.String]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"java.lang.IllegalArgumentException: field [loggingMessages] of type [java.util.ArrayList] cannot be cast to [java.lang.String]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"field [loggingMessages] of type [java.util.ArrayList] cannot be cast to [java.lang.String]"}}, "header"=>{"processor_type"=>"split"}})

Подскажите, что делать? Для корректного разбора поля loggingMessages

Ваши вопросы было бы гораздо проще читать если бы они были отформатированы. Я отформатировал вас предыдущий пост для примера. Для форматирования можно воспользоваться тройными кавычками ``` перед и после сегмента кода.

Да, так читается проще. Учту на будущее. Можно ли такое же простое решение для вопроса выше? =)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.