Ingest nodeのon_failure処理について

お世話になっております。

Ingest node Pipelineのon_failureについて質問させて頂きます。

下記のように
PipelineAからPipelineB、PipelineBからPipelineCを呼び出す処理を実施しています。

取込ファイルは、S3→Filebeat→Elasticsearch(ingest node)の順でindex登録しています。

PUT /_ingest/pipeline/PipelineA
{
  "processors": [
    {
      "pipeline": {
        "if": "ctx.aws.s3.bucket.name == 'aaa'",
        "name": "PipelineB"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }
  ]
}

PUT _ingest/pipeline/PipelineB
{
  "processors": [
    {
      "pipeline": {
        "name": "PipelineC" 
      },
      "date_index_name": {
        "if": "ctx.aws.s3['object.key'].startsWith('aaa/bbb')",
        "field": "@timestamp",
        "index_name_prefix": "test001",
        "date_rounding": "M" 
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }
  ]
}

PUT _ingest/pipeline/PipelineC
{
  "processors": [
    {
      "dissect": { (省略) }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }
  ]
}

以下事象の場合、
on_failure句処理を実施し、"failed-{{ _index }}"が作成されるはずがindexが作成されません。

・PipelineCのパイプラインを削除後、S3にファイルをアップロード
→PipelineBにて呼び出し先が不明でエラーとなるはず

・空データのファイルをS3にアップロード
→PipelineCのdissect processorsにてエラーとなるはず

なおPipelineのsimulateにて上記を実施した場合は、
index名が"failed-{{ _index }}"に更新されエラーを発生させることが出来ました。

原因として考えられるものについて、分かりましたらご教示をお願い致します。

事象をシンプルにするため、最低限のデータの準備と、登録はKibanaのDevToolで行いましたが、
PipelincCを削除後、データを投入したところSimulateで実行した場合と同じ結果になりました。

Filebeat→Elasticsearchのところは、本質的には、Kibana DevToolsやcurlを用いてPOST hoge/_doc/1のようにデータを投入するのと変わらないはずですので、
簡易な手段で再現できる方法がありますと、ヒントが見つかるかもしれません。

こちらでは、7.5.0で確認しています。以下、確認したときに使用した内容です。

PUT /_ingest/pipeline/PipelineA
{
  "processors": [
    {
      "pipeline": {
        "if": "ctx.aws.s3.bucket.name == 'aaa'",
        "name": "PipelineB"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }
  ]
}

PUT _ingest/pipeline/PipelineB
{
  "processors": [
    {
      "pipeline": {
        "name": "PipelineC" 
      },
      "date_index_name": {
        "if": "ctx.aws.s3['object.key'].startsWith('aaa/bbb')",
        "field": "@timestamp",
        "index_name_prefix": "test001",
        "date_rounding": "M" 
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed_b-{{ _index }}"
      }
    }
  ]
}

DELETE _ingest/pipeline/PipelineC

PUT _ingest/pipeline/PipelineC
{
  "processors": [
    {
      "dissect": {
          "field": "message",
          "pattern" : "%{test-a} %{test-b} %{test-c} %{test-d} %{test-e}"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed_c-{{ _index }}"
      }
    },
    {
      "set": {
        "field": "result-c",
        "value": "failed_c-{{ _index }}"
      }
    }
  ]
}

simulate

POST _ingest/pipeline/PipelineA/_simulate
{
  "docs": [
    {
      "_index": "test001-2020.01.27",
      "_id": "id",
      "_source": 
      {
        "@timestamp": "2016-04-25T12:02:01.357Z",
        "message": "this is a test .",
        "aws": {
          "s3": {
            "bucket": {
              "name": "aaa"
            },
            "object.key": "aaa/bbb"
          }
        }
      }
    }
  ]
}

データ投入

POST forum0127/_doc/1?pipeline=PipelineA
{
  "@timestamp": "2016-04-25T12:02:01.357Z",
  "message": "this is a test .",
  "aws": {
    "s3": {
      "bucket": {
        "name": "aaa"
      },
      "object.key": "aaa/bbb"
    }
  }
}

返信が遅くなり申し訳ありません。

回答頂きありがとうございます。

Kibana DevToolsでPOSTでデータを投入、filebeatログにてElasticsearchからの戻り値を確認したところ原因が分かりました。

原因としては、PipelineBのon_failure処理でindex名を更新する箇所でエラーになっていました。

大変参考になりました。

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.