Watcherの実装内容に関して

※9/25 一部内容を変更しました。

お世話になります。
Watcherの実装に関して、質問させて頂きます。

▼inputのindex
・A_indexとB_indexの紐づき項目は「tenantId」
・tenantIdにaddressは一意

A_index
_id tenantId eventid memo
1 001 111 test1
2 002 111 test2
B_index
id tenantid address
1 001 aaa@test.com
2 002 bbb@test.com

▼質問内容
A_indexに対象の「eventid」が発生した場合、
action句にてA_indexのinput取得件数分、webhookを実行する際に以下を実現したいです。
実現方法について、ご教示頂けますでしょうか。

​"webhook"の"body"

"to_address"
inputのfirstの「tenantId」に紐づく、secondの"address"を設定

例:tenantIdが"001"の場合は、"aaa@test.com",
tenantIdが"002"の場合は、"bbb@test.com"を設定

"body"
inputのfirstの「tenantId」に紐づく検出イベント:{{_source.memo}}を設定
​​例:tenantIdが"001"の場合は、"test1"
tenantIdが"002"の場合は、"test2"を設定

▼現段階のWatcher実装内容

{
  "trigger": {
    "schedule": {
      "interval": "2m"
    }
  },
  "input": {
    "chain": {
      "inputs": [
        {
          "first": {
            "search": {
              "request": {
                "search_type": "query_then_fetch",
                "indices": [
                  "A_index"
                ],
                "rest_total_hits_as_int": true,
                "body": {
                  "size": 10,
                  "query": {
                    "bool": {
                      "must": [
                        {
                          "range": {
                            "@timestamp": {
                              "gte": "{{ctx.trigger.scheduled_time}}||-2m"
                            }
                          }
                        },
                        {
                          "term": {
                            "eventid": "111"
                          }
                        }
                      ]
                    }
                  }
                }
              }
            }
          }
        },
        {
          "second": {
            "search": {
              "request": {
                "search_type": "query_then_fetch",
                "indices": [
                  "B_index"
                ],
                "rest_total_hits_as_int": true,
                "body": {
                  "size": 100,
                  "query": {
                    "match_all": {}
                  }
                }
              }
            }
          }
        }
      ]
    }
  },
  "condition": {
    "script": {
      "source": "return ctx.payload.first.hits.total > 0 && ctx.payload.second.hits.total > 0",
      "lang": "painless"
    }
  },
  "actions": {
    "aaa": {
      "throttle_period_in_millis": 600000,
      "foreach": "ctx.payload.first.hits.hits", 
      "max_iterations": 500,
      "webhook": {
        "scheme": "https",
        "host": "XXXXXXXXXX",
        "port": 111,
        "method": "post",
        "path": "XXXXXX",
        "params": {},
        "headers": {
          "X-Api-Key": "XXXXXXXXXXX",
          "Content-Type": "application/json"
        },
        "body": """{
"to_address": "{{ctx.payload.second.hits.hits._source.address}}",
"subject": "test",
"body": "{{#ctx.payload.first.hits.hits.0}}***********2.イベント発生内容*************
検出イベント:{{_source.memo}}
*********************************{{/ctx.payload.first.hits.hits.0}}
"
}"""
      }
    }
  }
}

詳細な質問で恐縮ですが、ご教示頂けますと幸いです。
宜しくお願い致します。

@harue

A, B 両方のインデックスを一度に search して、 filter アグリゲーションにより分類して、擬似的に JOIN を実行してみました。以下でやりたいことはできそうですか?参考になればよいのですが。

サンプルデータ

POST forum_249760_a/_bulk
{"index": {"_id": "1"}}
{"tenantId": "001", "eventId": "111", "memo": "test1"}
{"index": {"_id": "2"}}
{"tenantId": "002", "eventId": "111", "memo": "test2"}

POST forum_249760_b/_bulk
{"index": {"_id": "1"}}
{"tenantId": "001", "address": "aaa@test.com"}
{"index": {"_id": "2"}}
{"tenantId": "002", "address": "bbb@test.com"}

Watcher 定義
テスト目的で log アクションを使っていますが、 Webhook でも同じだと思います。

{
  "trigger": {
    "schedule": {
      "interval": "30m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "forum_249760_*"
        ],
        "rest_total_hits_as_int": true,
        "body": {
          "size": 0,
          "aggs": {
            "tenants": {
              "terms": {
                "field": "tenantId.keyword",
                "size": 10
              },
              "aggs": {
                "events": {
                  "filter": {
                    "exists": {
                      "field": "eventId"
                    }
                  },
                  "aggs": {
                    "data": {
                      "top_hits": {
                        "size": 1,
                        "_source": [
                          "eventId",
                          "memo"
                        ]
                      }
                    }
                  }
                },
                "tenants": {
                  "filter": {
                    "exists": {
                      "field": "address"
                    }
                  },
                  "aggs": {
                    "data": {
                      "top_hits": {
                        "size": 1,
                        "_source": "address"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gt": 0
      }
    }
  },
  "actions": {
    "log_hits": {
      "foreach": "ctx.payload.aggregations.tenants.buckets",
      "max_iterations": 500,
      "logging": {
        "text": "tenantId={{ctx.payload.key}}, address={{ctx.payload.tenants.data.hits.hits.0._source.address}}, memo={{ctx.payload.events.data.hits.hits.0._source.memo}}, eventId={{ctx.payload.events.data.hits.hits.0._source.eventId}}"
      }
    }
  }
}

Watcher 結果

次の二つが2回の action の実行結果として出力されます。
必要な情報は出力されていると思いますが、A_index で同一 tenantId について複数のイベントが発生している場合はどうなるのかな? とか思いました:

tenantId=001, address=aaa@test.com, memo=test1, eventId=111
tenantId=002, address=bbb@test.com, memo=test2, eventId=111

回答頂きありがとうございます。
上記サンプルデータ・Watcher定義にて、action句をwebhookに変更し試したところ
"to_address"と"body"の内容を出力することができました。

"actions": {
    "log_hits": {
      "foreach": "ctx.payload.aggregations.tenants.buckets",
      "max_iterations": 500,
      "webhook": {
        "scheme": "https",
        "host": "XXXXXXXXXX",
        "port": 111,
        "method": "post",
        "path": "XXXXXX",
        "params": {},
        "headers": {
          "X-Api-Key": "XXXXXXXXXXX",
          "Content-Type": "application/json"
        },
      """{
        "to_address": "{{ctx.payload.tenants.data.hits.hits.0._source.address}}",
        "subject": "test",
        "body": "memo:{{ctx.payload.events.data.hits.hits.0._source.memo}}" 
    }"""
      }
    }
  }

お手数ですが、追加のご質問をさせて頂きます。

①A_index で同一 tenantId について複数のイベントが発生している場合について

以下データを追加したところ、Watcher結果は回答頂いた2件と同様になり、
tenantId=001, address=aaa@test.com, memo=test3のデータは出力されませんでした。

POST forum_249760_a/_bulk
{"index": {"_id": "3"}}
{"tenantId": "001", "eventId": "111", "memo": "test3"}

actions句のforeachの定義内容を工夫すれば同一 tenantId について複数のイベントを発生させることが出来るようになると思いますが、分かっておりません。
恐れ入りますが、実現方法があるか回答を頂けますでしょうか?

➁POST forum_249760_aに@timestampを追加しinput句の条件に設定する場合

inputデータを以下とします。

POST forum_249760_a/_bulk
{"index": {"_id": "1"}}
{"tenantId": "001", "eventId": "111", "memo": "test1","@timestamp": "2020-10-05T13:44:00.510+09:00"}
{"index": {"_id": "2"}}
{"tenantId": "002", "eventId": "111", "memo": "test2","@timestamp": "2020-10-05T13:44:00.510+09:00"}

POST forum_249760_b/_bulk
{"index": {"_id": "1"}}
{"tenantId": "001", "address": "aaa@test.com"}
{"index": {"_id": "2"}}
{"tenantId": "002", "address": "bbb@test.com"}

trigger句,input句のWatcher定義
→トリガーを2分間隔、
input句にqueryを追加しforum_249760_aの@timestampが起動時間から2分以内のデータのみを対象としたいのですが、Watcher結果にforum_249760_bのaddressは取得できず、以下エラーが発生します。(inputにて取得出来ていないからエラーが発生していると思われます)

queryをforum_249760_aとforum_249760_b両方が対象になっているため、
エラーになってしまうのですが、forum_249760_aのみをqueryの条件とするためには、どのように設定すれば良いかご教示頂けますでしょうか?

{
  "trigger": {
    "schedule": {
      "interval": "2m"
    }
  },
      "input": {
        "search": {
          "request": {
            "search_type": "query_then_fetch",
            "indices": [
              "forum_249760_*"
            ],
            "rest_total_hits_as_int": true,
            "body": {
              "size": 0,
              "query": {
                "range": {
                  "@timestamp": {
                    "gte": "{{ctx.trigger.scheduled_time}}||-2m"
                  }
                }
              },
              "aggs": {
                "tenants": {
                  "terms": {
                    "field": "tenantId.keyword",
                    "size": 10
                  },
                  "aggs": {
                    "events": {
                      "filter": {
                        "exists": {
                          "field": "eventId"
                        }
                      },
                      "aggs": {
                        "data": {
                          "top_hits": {
                            "size": 10,
                            "_source": [
                              "eventId",
                              "memo"
                            ]
                          }
                        }
                      }
                    },
                    "tenants": {
                      "filter": {
                        "exists": {
                          "field": "address"
                        }
                      },
                      "aggs": {
                        "data": {
                          "top_hits": {
                            "size": 10,
                            "_source": "address"
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      },

Watcher結果(抜粋)

"actions": [
      {
        "id": "log_hits",
        "type": "webhook",
        "status": "failure",
        "error": {
          "root_cause": [
            {
              "type": "general_script_exception",
              "reason": "Error running com.github.mustachejava.codes.DefaultMustache@16eb4a47"
            }
          ],
          "type": "general_script_exception",
          "reason": "Error running com.github.mustachejava.codes.DefaultMustache@16eb4a47",
          "caused_by": {
            "type": "mustache_exception",
            "reason": "Failed to get value for ctx.payload.tenants.data.hits.hits.0._source.address @[query-template:2]",
            "caused_by": {
              "type": "mustache_exception",
              "reason": "0 @[query-template:2]",
              "caused_by": {
                "type": "index_out_of_bounds_exception",
                "reason": "0"
              }
            }
          }
        }
      }
    ]

再度質問してしまい申し訳ありませんが、回答頂けますと幸いです。
以上、宜しくお願い致します。

以下の様なクエリにしてboolクエリの条件を「日付範囲かアドレスがある」にすれば、強引に @timestamp の無いインデックスの方からもドキュメントを引っ張って来れますが、なかなか厳しいですね。

  "query": {
    "bool": {
      "minimum_should_match": 1,
      "should": [
        {
          "exists": {
            "field": "address"
          }
        },
        {
          "range": {
            "@timestamp": {
              "gte": "2020-10-05T12:44:00.510+09:00",
              "lte": "2020-10-05T13:44:00.510+09:00"
            }
          }
        }
      ]
    }
  }

ここまでくると、Watcher のクエリで JOIN ではなく、index_a のデータ投入時に enrich processor を使って index_b のアドレスをエンリッチしてやるのが良さそうです。

返信が遅れ申し訳ありません。

回答頂きありがとうございます。
参考にさせて頂きます。