Watcherで割合をしきい値にしたり、出力したりする方法

現在nginxのアクセスログを蓄積しており過去n分間の取引のうち処理時間(duration.rpが当たります)m秒以上がx%以上だったら警告というような運用を考えています。

データとして以下のようにn秒以上の件数をすべて抜き出し0秒以上の全件で割れば算出できるのではと考えましたがctx.payload.aggregations.duration_sla.bucketsの個別の要素へのアクセス方法と計算処理をどこに書けばよいかがいまいちわかっておらず、教えていただきたいです。

bucketsについては右記のようなアクセスを試して失敗しています。ctx.payload.aggregations.duration_sla.buckets['0.0-*']

{
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [ 
          <<index>>
         ],
        "types": [ ],
        "body": {
          "query": {
            "bool": {
              "must": [
                {
                  "match_all": {}
                },
                {
                  "range": {
                    "timestamp.timefilter": {
                      "gte": "now-{{ctx.metadata.time_window}}"
                    }
                  }
                }
              ],
              "filter": [ ],
              "should": [],
              "must_not": []
            }
          },
          "aggs": {
            "duration_sla": {
              "range": {
                "field": "duration.rp",
                "ranges": [
                  {
                    "from": 0
                  },
                  {
                    "from": 3
                  },
                  {
                    "from": 5
                  }
                ]
              }
            }
          }
        }
      }
    }
  },
  "transform": {
    ここで割合を算出?
  },
  "condition": {
    ここで割合のしきい値超過を判定
  },
  "actions": {
    "email_me": {
      "email": {
        "profile": "standard",
        "attachments": {
          "<<filename>>": {
            "reporting": {
              "url": "<<URL>>",
              "retries": 6,
              "interval": "20s",
              "auth": {
                "basic": {
                  "username": "<<id>>",
                  "password": "<<pass>>"
                }
              }
            }
          }
        },
        "from": "<<mail>>",
        "to": [
          "<<mail>>"
        ],
        "cc": [],
        "subject": "<<subject>>",
        "body": {
          "html": "{{#ctx.payload.aggregations.duration_sla.buckets}}{{key}}:({{doc_count}}):{{/ctx.payload.aggregations.duration_sla.buckets}}"
        }
      }
    }
  },
  "metadata": {
    "time_window": "2m"
  }
}

@hashimoto さん、

  • トップレベルのtransformconditionの評価が終わった後、actionへ引き渡す際の変換です。
  • searchの出力を変換するにはchainインプットを使う必要があります。
  • chainインプットの結果にアクセスするには各インプットの名前が必要です。ctx.payload.first.aggregationsなど。

こちら、試しに作ってみたのでご参考までに。

サンプルデータ:

POST forum_233636/_bulk
{"index": {}}
{"@timestamp": "2020-05-21T13:55:01+09:00", "duration.rp": 1}
{"index": {}}
{"@timestamp": "2020-05-21T13:55:02+09:00", "duration.rp": 1}
{"index": {}}
{"@timestamp": "2020-05-21T13:55:03+09:00", "duration.rp": 3}
{"index": {}}
{"@timestamp": "2020-05-21T13:55:04+09:00", "duration.rp": 4}
{"index": {}}
{"@timestamp": "2020-05-21T13:55:05+09:00", "duration.rp": 5}
{"index": {}}
{"@timestamp": "2020-05-21T13:55:06+09:00", "duration.rp": 7}
{"index": {}}
{"@timestamp": "2020-05-21T13:55:07+09:00", "duration.rp": 11}
{"index": {}}
{"@timestamp": "2020-05-21T13:55:08+09:00", "duration.rp": 1}
{"index": {}}
{"@timestamp": "2020-05-21T13:55:09+09:00", "duration.rp": 1}
{"index": {}}
{"@timestamp": "2020-05-21T13:55:10+09:00", "duration.rp": 1}

Watcher定義:

{
  "trigger": {
    "schedule": {
      "interval": "30m"
    }
  },
  "input": {
    "chain": {
      "inputs": [
        {
          "first": {
            "search": {
              "request": {
                "search_type": "query_then_fetch",
                "indices": [
                  "forum_233636"
                ],
                "body": {
                  "size": 0,
                  "query": {
                    "match_all": {}
                  },
                  "aggs": {
                    "duration_sla": {
                      "range": {
                        "field": "duration.rp",
                        "ranges": [
                          {
                            "from": 0
                          },
                          {
                            "from": 3
                          },
                          {
                            "from": 5
                          }
                        ]
                      }
                    }
                  }
                }
              }
            }
          }
        },
        {
          "counts": {
            "transform": {
              "script": """
         return ['all': ctx.payload.first.aggregations.duration_sla.buckets[0].doc_count,
         'gte3ms': ctx.payload.first.aggregations.duration_sla.buckets[1].doc_count,
         'gte5ms': ctx.payload.first.aggregations.duration_sla.buckets[2].doc_count]
      """
            }
          }
        }
      ]
    }
  },
  "condition": {
    "script": {
      "source": """
        return true || ((float) ctx.payload.counts.gte3ms / ctx.payload.counts.all) > 0.5
                || ((float) ctx.payload.counts.gte5ms / ctx.payload.counts.all) > 0.2;
        """
    }
  },
  "actions": {
    "my-logging-action": {
      "logging": {
        "level": "info",
        "text": "{{ctx.payload.counts}}"
      }
    }
  }
}