【ingest node】別のindexのデータを登録する方法について

お世話になります。

ingest node(またはLogstash等)にて、以下が実現できるかご教示をお願い致します。

▼実現したいこと
・ingest nodeでA indexにデータ登録する際に、B indexのデータも併せて登録したい(B indexは予め登録しておく)
・A indexとB indexは共通のキー項目が存在しないため、C indexという紐づけindexを予め登録しておき参照した上で登録したい

例:
A indexの投入データの「A_id」が"1111"の場合、
①C indexを参照し「B_id」の"aaaa"を取得

C index
A_id B_id
1111 aaaa

➁B indexを参照し「B_id」の"aaaa"に紐づく「B_comennt」の"test"を取得

B index
B_id B_comennt
aaaa test

③A indexに、「A_id」,「B_id」,「B_comennt」を登録

A index
A_id B_id B_comennt
1111 aaaa test

お手数ですが、回答を宜しくお願い致します。

Logstashのelasticsearchフィルターを使うとご要望のようなことが実現できるかと思います。

https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html#plugins-filters-elasticsearch-query

index_cとindex_bと2回照会すれば出来そうですね。

filter {
    # A indexの投入データの「A_id」が"1111"の場合、
    # ①C indexを参照し「B_id」の"aaaa"を取得
    elasticsearch {
        index => "index_c"
        query => "a_id: 1111"    # ここでは便宜上固定値にしています
        fields => { "b_id" => "b_id" }
        sort => "_id:desc"
        hosts => ["elasticsearch:9200"]
    }

    # ➁B indexを参照し「B_id」の"aaaa"に紐づく「B_comennt」の"test"を取得
    elasticsearch {
        index => "index_b"
        query => "b_id: %{b_id}"
        fields => { "b_comment" => "b_comment" }
        sort => "_id:desc"
        hosts => ["elasticsearch:9200"]
    }
}

filterの結果をoutputで標準出力に出したときがこちら

今回はインプットになる1111を固定値にしたため、以下に含まれていませんが、b_idとb_commentが含まれていることがわかるかと思います。

標準出力ではなく、Elasticsearchに登録するようにすればよろしいのではと。

{
    "@timestamp" => 2020-08-18T14:11:08.369Z,
          "host" => "2ed6b5091dfe",
       "message" => "asdf",
     "b_comment" => "test",
          "b_id" => "aaaa",
      "@version" => "1"
}

早急に回答頂きありがとうございます。

Logstashのelasticsearchフィルターを使うとご要望のようなことが実現できるかと思います。

→上記の実現方法存じていませんでした。試してみます。

ちなみにですが、LogstashではなくElasticSearch(ingest node等)で実現する方法はありますでしょうか?

お手数ですが、再度回答頂けますと幸いです。

Ingest Nodeですと、Enrich Processorがその機能になるかと思います。

概要はこちら
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ingest-enriching-data.html

今回の例に近い合致したデータを補完するときの参考
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/match-enrich-policy-type.html

match-enrich-policy-type.htmlの方を確認いただくとイメージがわくと思います。

こちらで確認したときの手順を以下に書いて終わります。

補完用のデータの準備

POST index_c/_doc/a
{
  "a_id": "1111",
  "b_id": "aaaa"
}

POST index_b/_doc/a
{
  "b_id": "aaaa",
  "b_comment": "test"
}

Enrich Policyの作成

# index_cに対してa_idをもとに、b_idを付与する
PUT /_enrich/policy/index-c-policy
{
  "match": {
    "indices": "index_c",
    "match_field": "a_id",
    "enrich_fields": ["b_id"]
  }
}

# index_bに対してb_idをもとに、b_commentを付与する
PUT /_enrich/policy/index-b-policy
{
  "match": {
    "indices": "index_b",
    "match_field": "b_id",
    "enrich_fields": ["b_comment"]
  }
}

PolicyのExecuteの実行

Enrich Processorは、直接index_bやindex_cを参照するわけではないため、Enrich用のIndex作成

POST /_enrich/policy/index-c-policy/_execute
POST /_enrich/policy/index-b-policy/_execute

Ingest Pipelineの作成

renameやremoveが入っているのは、Enrichの結果で階層構造になっているのを戻すため。

PUT /_ingest/pipeline/test
{
  "description": "forum test",
  "processors": [
    {
      "enrich": {
        "policy_name": "index-c-policy",
        "field": "a_id",
        "target_field": "b",
        "max_matches": "1"
      }
    },
    {
      "enrich": {
        "policy_name": "index-b-policy",
        "field": "b.b_id",
        "target_field": "b",
        "max_matches": "1"
      }
    },
    { 
      "rename": {
        "field": "b.b_comment",
        "target_field": "b_comment"
      }
    },
    {
      "rename": {
        "field": "b.b_id",
        "target_field": "b_id"
      }
    },
    {
      "remove": {
        "field": "b"
      }
    }
  ]
}

データ投入

PUT /my-index-00001/_doc/1?pipeline=test
{
  "a_id": "1111"
}

確認

GET /my-index-00001/_doc/1
{
  "_index" : "my-index-00001",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "a_id" : "1111",
    "b_id" : "aaaa",
    "b_comment" : "test"
  }
}

上記、早急に回答頂きありがとうございます。
Enrich Processorで実現できることを理解しました。

元々の質問自体は頂いた回答でクリアできたのですが、
Enrich Processorの検証をする中で、解決できない事象が発生しましたので
再度質問させて頂きます。(お手数おかけします…)

◆上記回答の「index_b」のfiledが2階層の場合

補完用のデータの準備

POST index_c/_doc/a
{
  "a_id": "1111",
  "ddd.b_id": "aaaa"
}

POST index_b/_doc/a
{
  "ddd.b_id": "aaaa",
  "ddd.b_comment": "test"
}

Enrich Policyの作成

PUT /_enrich/policy/index-c-policy
{
  "match": {
    "indices": "index_c",
    "match_field": "a_id",
    "enrich_fields": ["ddd.b_id"]
  }
}

PUT /_enrich/policy/index-b-policy
{
  "match": {
    "indices": "index_b",
    "match_field": "ddd.b_id",
    "enrich_fields": ["ddd.b_comment"]
  }
}

PolicyのExecuteの実行

Ingest Pipelineの作成

PUT /_ingest/pipeline/test
{
  "description": "forum test",
  "processors": [
    {
      "enrich": {
        "policy_name": "index-c-policy",
        "field": "a_id",
        "target_field": "b",
        "max_matches": "1"
      }
    },
    {
      "enrich": {
        "policy_name": "index-b-policy",
        "field": "b.ddd.b_id",                     ←★
        "target_field": "b",
        "max_matches": "1"
      }
    },
    { 
      "rename": {
        "field": "b.ddd.b_comment",
        "target_field": "ddd.b_comment"
      }
    },
    {
      "rename": {
        "field": "b.ddd.b_id",
        "target_field": "ddd.b_id"
      }
    },
    {
      "remove": {
        "field": "b"
      }
    }
  ]
}

Simulate

POST _ingest/pipeline/test/_simulate
{
  "docs": [
    {
      "_index": "aaa",
      "_id": "id1",
      "_source": {
        "a_id": "1111",
        "@timestamp": "2020-02-06T06:44:33.758Z"
      }
    }
  ]
}

Simulate結果

{
  "docs" : [
    {
      "error" : {
        "root_cause" : [
          {
            "type" : "illegal_argument_exception",
            "reason" : "field [ddd] not present as part of path [b.ddd.b_id]"
          }
        ],
        "type" : "illegal_argument_exception",
        "reason" : "field [ddd] not present as part of path [b.ddd.b_id]"
      }
    }
  ]
}

おそらく上記★箇所でエラーが発生していると思われます。
enrich processorは、3階層のfiledは処理が出来ないのでしょうか。

3階層のfiledは処理できないとして、試しに★箇所の処理の前にrename処理を移動したところ、
別のエラーが発生しました。

Ingest Pipelineの作成

PUT /_ingest/pipeline/test
{
  "description": "forum test",
  "processors": [
    {
      "enrich": {
        "policy_name": "index-c-policy",
        "field": "a_id",
        "target_field": "b",
        "max_matches": "1"
      }
    },
    {
      "rename": {
        "field": "b.ddd.b_id",
        "target_field": "ddd.b_id"
      }
    },
    {
      "enrich": {
        "policy_name": "index-b-policy",
        "field": "ddd.b_id",
        "target_field": "b",
        "max_matches": "1"
      }
    },
    {
      "rename": {
        "field": "b.ddd.b_comment",
        "target_field": "ddd.b_comment"
      }
    },
    {
      "remove": {
        "field": "b"
      }
    }
  ]
}

Simulate結果

{
  "docs" : [
    {
      "error" : {
        "root_cause" : [
          {
            "type" : "illegal_argument_exception",
            "reason" : "field [b.ddd.b_id] doesn't exist"
          }
        ],
        "type" : "illegal_argument_exception",
        "reason" : "field [b.ddd.b_id] doesn't exist"
      }
    }
  ]
}

エラーを起こさず、index登録する方法はありますでしょうか?
お手数ですが、分かりましたら回答を頂けますと幸いです。

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.