【ingest node】enrich processorに関して

お世話になります。

ingest nodeのenrich processorに関して、質問させて頂きます。
(下記問い合わせに関連したご質問になります。)

▼実現したいこと

A indexの投入データの「A_id」が"1111"の場合

①C index
「A_id」の値を条件に、「B_id」の値(複数有)を取得

C index
_id A_id B_id
1 1111 aaaa
2 1111 bbbb

➁B index
①で取得した「B_id」の値(複数有)を条件に、各値に紐づく「B_comennt」の値を取得

B index
_id B_id B_comennt
1 aaaa test1
2 bbbb test2

③A index
「A_id」,②で取得した「B_id」,「B_comennt」を登録
(「B_id」,「B_comennt」はarrayで1ドキュメントで登録)

A index
_id A_id B_id B_comennt
1 1111 aaaa,bbbb test1,test2

▼検証

補完用のデータの準備

POST index_c/_doc/1
{
  "a_id": "1111",
  "b_id": "aaaa"
}

POST index_c/_doc/2
{
  "a_id": "1111",
  "b_id": "bbbb"
}


POST index_b/_doc/1
{
  "b_id": "aaaa",
  "b_comment": "test1"
}

POST index_b/_doc/2
{
  "b_id": "bbbb",
  "b_comment": "test2"
}

Enrich Policyの作成

# index_cに対してd_idをもとに、f_idを付与する
PUT /_enrich/policy/index-c-policy
{
  "match": {
    "indices": "index_c",
    "match_field": "a_id",
    "enrich_fields": ["b_id"]
  }
}

# index_bに対してb_idをもとに、b_commentを付与する
PUT /_enrich/policy/index-b-policy
{
  "match": {
    "indices": "index_b",
    "match_field": "b_id",
    "enrich_fields": ["b_comment"]
  }
}

PolicyのExecuteの実行

POST /_enrich/policy/index-c-policy/_execute
POST /_enrich/policy/index-b-policy/_execute

Ingest Pipelineの作成

PUT /_ingest/pipeline/test
{

  "processors": [
    {
      "enrich": {
        "policy_name": "index-c-policy",
        "field": "a_id",
        "target_field": "b",
        "max_matches": "2"
      }
    },
    {
      "enrich": {
        "policy_name": "index-b-policy",
        "field": "b.b_id",           ←★
        "target_field": "b",
        "max_matches": "2"
      }
    },
    { 
      "rename": {
        "field": "b.b_comment",
        "target_field": "b_comment"
      }
    },
    {
      "rename": {
        "field": "b.b_id",
        "target_field": "b_id"
      }
    },
    {
      "remove": {
        "field": "b"
      }
    }
  ]
}

データ投入

PUT /my-index-00001/_doc/1?pipeline=test
{
  "a_id": "1111"
}

データ投入後、下記エラーが発生

{
  "error" : {
    "root_cause" : [
      {
        "type" : "illegal_argument_exception",
        "reason" : "[b_id] is not an integer, cannot be used as an index as part of path [b.b_id]"
      }
    ],
    "type" : "illegal_argument_exception",
    "reason" : "[b_id] is not an integer, cannot be used as an index as part of path [b.b_id]",
    "caused_by" : {
      "type" : "number_format_exception",
      "reason" : "For input string: \"b_id\""
    }
  },
  "status" : 400
}

エラーの原因は上記★箇所であり、"b"の値が以下形式になっているためですが、

"b": [
  {
    "a_id": "1111",
    "b_id": "aaaa"
  },
  {
    "a_id": "1111",
    "b_id": "bbbb"
  }
],

正しく動作するためには、どのような記述をすれば良いでしょうか。
(そもそもですが、enrich processorで複数の値を元に紐づけることは可能でしょうか。)

お手数ですが、回答を頂けますと幸いです。
以上、宜しくお願い致します。

enrichの1回目で取得された結果がarrayになっているため、2回目のenrichのb.b_idがうまく取れないことが原因かと思います。

enrichを複数回、しかも結果が複数になるパターンが組み合わさっている前提の方を変えられないでしょうか?

具体的にいうと、今回のindex_b, index_cをみて、予めindex_dのようなものを作っておきます。
(index_cに対して、b_commentを補完するようなpipelineを設定してreindexすればindex_dが作れます)
そうすることで、enrichの回数を1回減らせて、結果的に求めたい結果に近くのではと思います。

作成されるindex_dの中身の例

         {
          "b" : [
            {
              "b_id" : "aaaa",
              "b_comment" : "test1"
            }
          ],
          "a_id" : "1111",
          "b_id" : "aaaa"
        }

このindex_dに対して作成したenrich policyを使うのはどうでしょうか。

PUT /_enrich/policy/index-d-policy
{
  "match": {
    "indices": "index_d",
    "match_field": "a_id",
    "enrich_fields": ["b_id", "b.b_comment"]
  }
}

POST /_enrich/policy/index-d-policy/_execute

index-dのPolicyを使ってindexするようにする

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "enrich": {
          "policy_name": "index-d-policy",
          "field": "a_id",
          "target_field": "age",
          "max_matches": "20"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "aaa",
      "_id": "id1",
      "_source": {
        "a_id": "1111",
        "@timestamp": "2020-02-06T06:44:33.758Z"
      }
    }
  ]
}

このような感じでtest1、test2の値まで入ることが確認できます。

もし、b_commentだけを抜き出したいということであれば、scriptを使うと抽出もできそうです。

ご参考になれば。

返信が遅くなり申し訳ありません。

回答頂きありがとうございます。
上記の案を参考にさせて頂きます。

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.