Watcher Index Actionで、ドキュメントを分けて出力する

基礎的な質問で恐縮ですが、WatcherのIndex Actionを利用すると、デフォルトでは、集計結果が、一つのドキュメントにまとめられて出力されます。
これを、Watcherで引っかかったドキュメントが、そのまま別々の状態で出力されるように設定をしたいです。

たとえば、
①下記のような3つのデータを用意

*doc1
pc_name : A
feature : B
*doc2
pc_name : A
feature : C
*doc3
pc_name : B
feature : B

②Watcherで、FeatureにBが含まれるときに、Indexを作成するように設定
③doc1とdoc3が、一つのインデックスに、別々のドキュメントして出力
という流れをつくりたいです

初歩的なミスなのかもしれませんが、、下記の通り、Transform句を使用して作成してみたのですが、「no requests added」というエラーが出てしまいました。

{
  "trigger": {
    "schedule": {
      "interval": "2m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "license_used"
        ],
        "types": [],
        "body": {
          "size": 0,
          "aggs": {
            "pc_name": {
              "terms": {
                "field": "pc_name"
              }
            },
            "start_date": {
              "terms": {
                "field": "start_date"
              }
            }
          },
          "query": {
            "bool": {
              "must": [
                {
                  "range": {
                    "start_date": {
                      "gte": "now-1M"
                    }
                  }
                },
                {
                  "terms": {
                    "feature": [
                      "*****",
                      "*****"
                    ]
                  }
                }
              ]
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gte": 10
      }
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": {
          "source": "return['_doc':ctx.payload.hits.hits]",
          "lang": "painless"
        }
      },
      "index": {
        "index": "t.watch_highuse0517",
        "doc_type": "doc"
      }
    }
  }
}

インデックスに登録したいドキュメントの配列を「_doc」に詰めることで、
複数ドキュメントのインデクシングが可能です。

"source": "return['_doc':ctx.payload.hits.hits]",

hits.hitsを詰めると、_source部分以外に余計なメタデータが入っているので失敗します。

以下の部分を、配列にすればOKです。
hits.hits[0]._source
hits.hits[1]._source

具体的なサンプルは、こちらのブログにありますので、
ご参考に。
https://www.shin0higuchi.com/entry/2017/11/02/002955

1 Like

早速のご回答ありがとうございます。
上記のブログは私も拝見したのですが、

"script": {
"source": "return['_doc':ctx.payload.hits.hits]",
"lang": "painless"
}

"script": "for (int i = 0; i < ctx.payload.hits.hits.length; ++i) {ctx.payload.hits.hits[i] = ctx.payload.hits.hits[i]._source } return [ '_doc' : ctx.payload.hits.hits]"

に変えて実行すると、
"Validation Failed: 1: no requests added;"
と出てしまい、うまくいかない状況です。。

bodyの先頭に
size=0
と指定しているため、indexingする_sourceが取得できていないのが原因だと思います。

一度、body内のJSONをコピーしてsearch APIを叩いてみるのがおすすめです。

1 Like

size0で検索した際のレスポンスは下記のように、hitsが空配列になりますが、hits.totalは取得するので、conditionにマッチしてしまいます。

{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : null,
"hits" :
},
"aggregations" : {
"term" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : 56,
"doc_count" : 1
},
{
"key" : 66,
"doc_count" : 1
}
]
}
}
}

1 Like

ありがとうございます。
size=1000
にしたところ、目的の動作が実現できました!
大変助かりました!

なお、検索ロジックをこれから作成していくため、
そちらが完了するまでは、こちらの質問チケットはオープンのままにさせていただきます。

2 Likes

たびたび失礼いたします、
出力自体はできたのですが、
検索・抽出ロジックで不明点があり、追加で質問させていただきます。
かなり簡略化していますが、下記のような流れで抽出を行おうとしています。

①下記のようなデータを用意

*doc1
pc_name : A
feature : B
start_date: 2019/5/18
login_date: 2019/5/19

*doc2
pc_name : A
start_date: 2019/5/18
login_date: 2019/5/22

*doc3
pc_name : B
start_date: 2019/5/21
login_date: 2019/5/20

*doc4
pc_name : B
start_date: 2019/5/21
login_date: 2019/5/22

②下記のようなロジックで出力
(1) PC Nameとstart_dateのセットをキーとしてグルーピング
→この場合、doc1とdoc2がグループ、doc3とdoc4がグループになる。
(2) 上記のグループごとに、start_date と login_dateの差を算出
(3) 1グループあたり、start_date と login_dateの差が一番大きいドキュメントのみをインデキシングする((1)のグループ数=レコード数になる)

③出力結果のイメージ
→上記データのうち、doc2、doc4が出力される

[0]
  pc_name : A
  feature : C
  start_date: 2019/5/18
  login_date: 2019/5/22

[1]
pc_name : B
feature : B
   start_date: 2019/5/21
   login_date: 2019/5/22

現状は、ヒットしたレコードを、ソートして、上位50個を抜き出す、という風にして、それとなく絞っていますが、必要なのは同じFeature・Startdateでグループ化されたレコードのうち、Logindateがもっとも大きいものだけになるため、単に並び替えるだけでなく「Max以外抽出しない」という
ロジックが組みたいです。

※下記が実際の使用しているコードです

{
  "trigger": {
    "schedule": {
      "interval": "1h"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "license_used"
        ],
        "types": [],
        "body": {
          "size": 50,
          "aggregations": {
            "2": {
              "terms": {
                "field": "pc_name",
                "size": 1000,
                "order": {
                  "1": "desc"
                }
              },
              "aggs": {
                "1": {
                  "max": {
                    "field": "login_date"
                  }
                },
                "3": {
                  "terms": {
                    "field": "feature",
                    "size": 2,
                    "order": {
                      "1": "desc"
                    }
                  },
                  "aggs": {
                    "1": {
                      "max": {
                        "field": "login_date"
                      }
                    },
                    "5": {
                      "terms": {
                        "field": "start_date",
                        "size": 1000,
                        "order": {
                          "1": "desc"
                        }
                      },
                      "aggs": {
                        "1": {
                          "max": {
                            "field": "login_date"
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          },
          "query": {
            "bool": {
              "must": [
                {
                  "range": {
                    "login_date": {
                      "gte": "now-1M"
                    }
                  }
                },
                {
                  "terms": {
                    "feature": [
                      "*****",
                      "*****"
                    ]
                  }
                },
                {
                  "script": {
                    "script": {
                      "lang": "painless",
                      "source": "(doc['login_date'].date.getMillis()/1000 - doc['start_date'].date.getMillis()/1000) > 172800"
                    }
                  }
                }
              ]
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gte": 10
      }
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": {
          "source": "for (int i = 0; i < ctx.payload.hits.hits.length; ++i) {ctx.payload.hits.hits[i] = ctx.payload.hits.hits[i]._source }  return [ '_doc' : ctx.payload.hits.hits]",
          "lang": "painless"
        }
      },
      "index": {
        "index": "t.watch_highuse0517h",
        "doc_type": "doc"
      }
    }
  }
}

(1) PC Nameとstart_dateのセットをキーとしてグルーピング

必要なのは同じFeature・Startdateでグループ化されたレコードのうち、Logindateがもっとも大きいものだけになるため、

キーにするのが以下のどちらか分からなかったので、
①pc_name + start_date
②feature + start_date
とりあえず、①と仮定します。

以下の要件だとすると、
・pc_name + start_dateでグルーピング
・login_dateが最大のレコードを抽出する

クエリは以下のように書いてはどうでしょうか。

# データ投入とクエリ
DELETE test

PUT test
{
  "mappings": {
    "properties": {
      "pc_name": {
        "type": "keyword"
      },
      "feature": {
        "type": "keyword"
      },
      "login_date": {
        "type": "date",
        "format": "yyyy/M/dd"
      },
      "start_date": {
        "type": "date",
        "format": "yyyy/M/dd"
      }
    }
  },
  "settings": {
    "index": {
      "number_of_shards": "1",
      "number_of_replicas": "0"
    }
  }
}

PUT test/_doc/1
{
  "pc_name": "A",
  "feature": "X-1",
  "start_date": "2019/5/18",
  "login_date": "2019/5/19"
}

PUT test/_doc/2
{
  "pc_name": "A",
  "feature": "X-2",
  "start_date": "2019/5/18",
  "login_date": "2019/5/22"
}

PUT test/_doc/3
{
  "pc_name": "B",
  "feature": "X-3",
  "start_date": "2019/5/21",
  "login_date": "2019/5/21"
}

PUT test/_doc/4
{
  "pc_name": "B",
  "feature": "X-4",
  "start_date": "2019/5/21",
  "login_date": "2019/5/22"
}

GET test/_search
{
  "size": 0,
  "aggs": {
    "unique_key": {
      "composite": {
        "sources": [
          {
            "pc_name": {
              "terms": {
                "field": "pc_name"
              }
            }
          },
          {
            "start_date": {
              "terms": {
                "field": "start_date"
              }
            }
          }
        ]
      },
      "aggs": {
        "latest_login": {
          "top_hits": {
            "sort": [
              {
                "login_date": {
                  "order": "desc"
                }
              }
            ],
            "size": 1
          }
        }
      }
    }
  }
}

実行結果

#レスポンス
{
  "took" : 754,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "unique_key" : {
      "after_key" : {
        "pc_name" : "B",
        "start_date" : 1558396800000
      },
      "buckets" : [
        {
          "key" : {
            "pc_name" : "A",
            "start_date" : 1558137600000
          },
          "doc_count" : 2,
          "latest_login" : {
            "hits" : {
              "total" : {
                "value" : 2,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "test",
                  "_type" : "_doc",
                  "_id" : "2",
                  "_score" : null,
                  "_source" : {
                    "pc_name" : "A",
                    "feature" : "X-2",
                    "start_date" : "2019/5/18",
                    "login_date" : "2019/5/22"
                  },
                  "sort" : [
                    1558483200000
                  ]
                }
              ]
            }
          }
        },
        {
          "key" : {
            "pc_name" : "B",
            "start_date" : 1558396800000
          },
          "doc_count" : 2,
          "latest_login" : {
            "hits" : {
              "total" : {
                "value" : 2,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "test",
                  "_type" : "_doc",
                  "_id" : "4",
                  "_score" : null,
                  "_source" : {
                    "pc_name" : "B",
                    "feature" : "X-4",
                    "start_date" : "2019/5/21",
                    "login_date" : "2019/5/22"
                  },
                  "sort" : [
                    1558483200000
                  ]
                }
              ]
            }
          }
        }
      ]
    }
  }
}

これを、Watcherのactionsの中のtransformでうまく_docに詰めてみてください。

1 Like

ありがとうございます。
作動できました。

なお、説明を簡略化するために”Feature・Startdateでグループ化”と言いましたが、実際にはFeature/ start_date/ pc_nameでグループ化になります。

2点、課題・疑問がございます。

①Transformでhits.hitsに入れ込むには"size"を指定しなければならず、結果的に、検索上位でないドキュメントも抽出されてしまう

"size"("body"の直下、位置でいうと下記コードの"aggs"の上の行の"size")を指定しない方法、あるいは曖昧にしたり、変数にしたりする方法ってないですよね、、

②Feature/ start_date/ pc_nameをユニークキーにしたのですが、同じFeature/ start_date/ pc_name
で、Logindateの異なるドキュメントが2,3個ほど検索最上位に入り込んでしまう
ドキュメントを懸命に読んでおりますが、こちらはさっぱり原因がわかりません。。

以下、使用しているコードです。

{
  "trigger": {
    "schedule": {
      "interval": "3d"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "license_used"
        ],
        "types": [],
        "body": {
          "query": {
            "bool": {
              "must": [
                {
                  "range": {
                    "login_date": {
                      "gte": "now-1M"
                    }
                  }
                },
                {
                  "terms": {
                    "feature": [
                      "*****",
                      "*****"
                    ]
                  }
                },
                {
                  "script": {
                    "script": {
                      "lang": "painless",
                      "source": "(doc['login_date'].date.getMillis()/1000 - doc['start_date'].date.getMillis()/1000) > 172800"
                    }
                  }
                }
              ]
            }
          },
          "size": 100,
          "aggs": {
            "unique_key": {
              "composite": {
                "sources": [
                  {
                    "pc_name": {
                      "terms": {
                        "field": "pc_name"
                      }
                    }
                  },
                  {
                    "start_date": {
                      "terms": {
                        "field": "start_date"
                      }
                    }
                  },
                  {
                    "feature": {
                      "terms": {
                        "field": "feature"
                      }
                    }
                  }
                ]
              },
              "aggs": {
                "latest_login": {
                  "top_hits": {
                    "sort": [
                      {
                        "login_date": {
                          "order": "desc"
                        }
                      }
                    ],
                    "size": 1
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gte": 1
      }
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": {
          "source": "for (int i = 0; i < ctx.payload.hits.hits.length; ++i) {ctx.payload.hits.hits[i] = ctx.payload.hits.hits[i]._source }  return [ '_doc' : ctx.payload.hits.hits]",
          "lang": "painless"
        }
      },
      "index": {
        "index": "t.watch_highuse0517k",
        "doc_type": "doc"
      }
    }
  }
}

言葉足らずでうまく伝わらなかったようですみません。

sizeは0のままでよいです。
私が先ほど投稿したクエリに対して、
compositはFeature/ start_date/ pc_nameの3つに変更したものを、
Watcherで実行してください。

Conditionは、
ctx.payload.aggregations.unique_key.buckets
の配列要素が1個以上かどうか。

actionsのtransformで、
ctx.payload.aggregations.unique_key.buckets
をループで回して、
各要素の中の
latest_login.hits.hits[0]._source
を取得して、_docに詰めるとよいと思います。

1 Like

ありがとうございます。
教えていただいた内容で、動作できました。

ただ、unique_key.bucketsが、想定では100件ほどになるはずなのですが、毎回必ず10件しかひっかかりません(⇒ ctx.payload.aggregations.unique_key.buckets.length10)。
検索条件を多少変えても、やはり10件しかひっかからず、構文が悪いのか、Watcherの仕様なのか、データが悪いのかの切り分けが分からず。。。

何度も申し訳ございませんが、こちら教えていただけますでしょうか。

もし、データが下記のような3件であれば
ctx.payload.aggregations.unique_key.buckets.lengthは、3になる認識であっているか
(同じ要領でデータ件数が10件以上になった場合は、当然、bucketsの数も10個以上になる認識であっているか)

_doc/1
Feature A
start_date A
pc_name B

_doc/2
Feature A
start_date B
pc_name A

_doc/3
Feature B
start_date A
pc_name A

※現在使用しているコード

{
  "trigger": {
    "schedule": {
      "interval": "1d"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "license_used
        ],
        "types": [],
        "body": {
          "query": {
            "bool": {
              "must": [
                {
                  "range": {
                    "login_date": {
                      "gte": "now-1M"
                    }
                  }
                },
                {
                  "terms": {
                    "feature": [
                      "*****",
                      "*****"
                    ]
                  }
                },
                {
                  "script": {
                    "script": {
                      "lang": "painless",
                      "source": "(doc['login_date'].date.getMillis()/1000 - doc['start_date'].date.getMillis()/1000) > 172800"
                    }
                  }
                }
              ]
            }
          },
          "size": 0,
          "aggs": {
            "unique_key": {
              "composite": {
                "sources": [
                  {
                    "pc_name": {
                      "terms": {
                        "field": "pc_name"
                      }
                    }
                  },
                  {
                    "start_date": {
                      "terms": {
                        "field": "start_date"
                      }
                    }
                  },
                  {
                    "feature": {
                      "terms": {
                        "field": "feature"
                      }
                    }
                  }
                ]
              },
              "aggs": {
                "latest_login": {
                  "top_hits": {
                    "sort": [
                      {
                        "login_date": {
                          "order": "desc"
                        }
                      }
                    ],
                    "size": 1
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gte": 1
      }
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": {
          "source": "for (int i = 0; i < ctx.payload.aggregations.unique_key.buckets.length; ++i) {ctx.payload.aggregations.unique_key.buckets[i] = ctx.payload.aggregations.unique_key.buckets[i].latest_login.hits.hits[0]._source }  return [ '_doc' : ctx.payload.aggregations.unique_key.buckets]",
          "lang": "painless"
        }
      },
      "index": {
        "index": "t.watch_highuse0527",
        "doc_type": "doc"
      }
    }
  }
}

こちらは認識合っていると思います。

下記のように、aggregation側でsizeを指定する必要があると思います。デフォルト値は忘れましたが、おそらく10件なのでしょう。

"composite": {
    "size": 100,
    "sources": [
          {
               略
          }     
    ]
}

参考 : Composite aggregation | Elasticsearch Guide [8.11] | Elastic

1 Like

そうですね。
shin-higuchiさんの書いた通り、
compositeにsizeを指定すればOKです :slightly_smiling_face:

sizeのデフォルトは10です。
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-aggregations-bucket-composite-aggregation.html#_size

2 Likes

yoshiokaさん、shin-higuchiさん、

compositeのSizeを指定したところ、うまくいきました!
瞬殺ですね・・・笑

迅速に、かつ、本当に細かいところまで教えていただき、ありがとうございました。

1 Like

WatcherのIndex Actionで、Index名を可変にする方法がないか

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.