Aggregate filter の timeout_timestamp_field設定時の動作について

Aggregate filter pluginのオプションtimeout_timestamp_fieldについて、

機能追加の経緯やドキュメントの記載から設定すると、タイムアウトの判定がシステム時間からログのタイムスタンプに変わると思っていたが、実際に動かしてみると、システム時間で判定されたような挙動をした。
(私と同じ疑問を持った方が過去にいたよう。https://discuss.elastic.co/t/aggregate-filter-with-timeout-timestamp-field/144260)

このオプションを設定した場合の正しい挙動がわかる方いたら教えてください。


【検証】
検証で使った設定(.conf)

input {
  stdin{}
}

filter {
   grok {
     match => [ "message", "%{NOTSPACE:event_time} %{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ]
   }
   date{
     match => [ "event_time", "yyyy-MM-dd'T'HH:mm:ss.SSS"]
     target => "event_time"
   }

   if [logger] == "TASK_START" {
     aggregate {
       task_id => "%{taskid}"
       timeout_timestamp_field => "event_time"
       code => "map['sql_duration'] = 0"
       map_action => "create"
       timeout => 30
       push_map_as_event_on_timeout => true
     }
   }

   if [logger] == "SQL" {
     aggregate {
       task_id => "%{taskid}"
       code => "map['sql_duration'] += event.get('duration')"
       map_action => "update"
     }
   }

   if [logger] == "TASK_END" {
     aggregate {
       task_id => "%{taskid}"
       code => "event.set('sql_duration', map['sql_duration'])"
       map_action => "update"
       end_of_task => true
     }
   }
}
output{
    stdout{}
}

入力

2023-06-17T07:30:00.000 INFO - 12345 - TASK_START - start
2023-06-17T07:30:40.000 INFO - 12345 - SQL - sqlQuery1 - 12  ←タスク開始から40秒後のログが投入されたため、この時点でタイムアウトになりマップの情報がイベントとして出力されることを想定

結果
タスク開始から約30秒後にマップに格納されていた値がイベントとして出力されたことを確認

{
        "@version" => "1",
      "@timestamp" => 2023-06-17T15:31:35.935962Z,
    "sql_duration" => 12
}

It took several hours of experimentation to find out what this option does :slight_smile: Suppose you have four entries in the log file

2023-05-17T07:30:00.000 INFO - 12345 - TASK_START - start
2023-05-17T07:30:13.000 INFO - 12345 - SQL - sqlQuery1 - 12 ←タ
2023-05-17T07:40:00.000 INFO - 12345 - TASK_START - start
2023-05-17T07:40:13.000 INFO - 12345 - SQL - sqlQuery1 - 11 ←

The first two entries are 13 seconds apart, then there is a gap of 10 minutes and another pair of entries 13 seconds apart. I removed map_action => "create" so that it uses the default value (create_or_update) because you need the third log entry to run through the "update" path in the code. With this configuration

grok { match => { "message" => "^%{NOTSPACE:event_time} %{LOGLEVEL} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD}( - %{INT:duration:int})?" } }
date{ match => [ "event_time", "yyyy-MM-dd'T'HH:mm:ss.SSS"] target => "event_time" }
if [logger] == "TASK_START" {
    aggregate {
        task_id => "%{taskid}"
        timeout_timestamp_field => "event_time"
        code => "map['sql_duration'] = 0"
        timeout => 20
        push_map_as_event_on_timeout => true
    }
}

if [logger] == "SQL" {
    aggregate {
        task_id => "%{taskid}"
        code => "map['sql_duration'] += event.get('duration')"
        map_action => "update"
    }
}

mutate { remove_field => [ "message", "@version" ] }

this will result in

{
"@timestamp" => 2023-06-17T22:23:18.068229289Z,
"event_time" => 2023-05-17T11:30:00.000Z,
    "taskid" => "12345",
    "logger" => "TASK_START"
}
{
"@timestamp" => 2023-06-17T22:23:18.069230438Z,
"event_time" => 2023-05-17T11:30:13.000Z,
    "taskid" => "12345",
    "logger" => "SQL",
  "duration" => 12
}

This is just processing the first two lines in the log. When the aggregate gets to the next line it notices that event_time is 9:47 later than the last entry it saw. That is more than 20 seconds, so it forces a timeout, takes the map and creates an event from it (but puts it aside for a moment). It then finishes processing the third log entry.

{
"@timestamp" => 2023-06-17T22:23:18.069661865Z,
"event_time" => 2023-05-17T11:40:00.000Z,
    "taskid" => "12345",
    "logger" => "TASK_START"
}

then it issues the event containing the map.

{
  "@timestamp" => 2023-06-17T22:23:18.179425126Z,
"sql_duration" => 12
}

then it proceses the fourth log entry

{
"@timestamp" => 2023-06-17T22:23:18.069889961Z,
"event_time" => 2023-05-17T11:40:13.000Z,
    "taskid" => "12345",
    "logger" => "SQL",
  "duration" => 11
}

There is no new TASK_START event, so we have to wait for the timeout to occur which happens 25 seconds later (timeouts are only checked every 5 seconds)

{
  "@timestamp" => 2023-06-17T22:23:43.076634051Z,
"sql_duration" => 11
}

丁寧に回答いただきありがとうございます。
いただいた例について確認しました。

2023-05-17T07:30:00.000 INFO - 12345 - TASK_START - start
2023-05-17T07:30:23.000 INFO - 12345 - SQL - sqlQuery1 - 12
2023-05-17T07:40:00.000 INFO - 12345 - TASK_START - start
2023-05-17T07:40:23.000 INFO - 12345 - SQL - sqlQuery1 - 11

ログを23秒ずらしています。この場合、2行目でタイムアウト判定されることを想定していましたが、結果 Badgerさんの提示してくれたログと同様に、3行目でタイムアウトしました。

タイムアウトは都度判定される認識でしたが、判定が始まる要因はあるのでしょうか。map_action=>"create_or_update"など。

3行目でタイムアウト判定される一方で、2行目でタイムアウト判定されない理由がどうしてもわかりません。

ご回答いただけますと幸いです。

The second line goes through the second aggregate filter, and that does not have any timeout options set, so it does not do timeout processing. You can only have timeout options on one aggregate filter. If you try to add them to the second aggregate you will get an error.

We could change things to use a single aggregate

    aggregate {
        task_id => "%{taskid}"
        timeout_timestamp_field => "event_time"
        timeout_task_id_field => "taskid"
        code => '
            map["sql_duration"] ||= 0
            d = event.get("duration")
            if d
                map["sql_duration"] += d
            end
        '
        timeout => 20
        push_map_as_event_on_timeout => true
    }

This will trigger the timeout on the second line, but timeout processing occurs before the code block executes, so the event that is created at that time will have duration set to 0. Another event created by a later timeout will have the correct value in it.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.