Ingestnodeのエラー処理について

お世話になっております。

Ingest node Pipelineのon_failureの設定方法について質問させて頂きます。

下記のようにPipelineAからPipelineB、PipelineBからPipelineCを呼び出す処理を検討しています。

PUT /_ingest/pipeline/PipelineA
{
  "processors": [
    {
      "pipeline": {
        "if": "ctx.aws.s3.bucket.name == 'aaa'",
        "name": "PipelineB" 
      }
    }
  ]
}

PUT _ingest/pipeline/PipelineB
{
  "processors": [
    {
      "pipeline": {
        "name": "PipelineC" 
      },
      "date_index_name": {
        "if": "ctx.aws.s3['object.key'].startsWith('aaa/bbb')",
        "field": "@timestamp",
        "index_name_prefix": "test001",
        "date_rounding": "M" 
      }
    }
  ]
}

PUT _ingest/pipeline/PipelineC
{
  "processors": [
    {
      "dissect": { (省略) }
    }
  ]
}

PipelineA、B、Cの全処理でのエラー発生時を考慮して、on_failure句を設定したいのですが、
設定方法としては以下のどちらが正しいのでしょうか。

①PipelineA、B、Cそれぞれにon_failu句を記載
②PipelineA(トップ階層)にon_failu句を記載

お手数ですが、回答頂けますと幸いです。

@harue さん

on_failure 句の設定ですが、Javaなどのプログラミングにおけるtry/catchの考え方と同じだと考えてみるとしっくりくるかもしれません。

エラー発生時に以下の2点を考慮してみると良いと思います。

  1. 代替え処理があるか
  2. 処理継続可能か

on_failue はプロセッサ単位にも指定できます。これが一番細かい粒度なので、特定の処理で特定の代替え処理が必要なら、プロセッサでon_failureを定義します。
その後、処理継続困難なのであれば、on_failure の中で fail プロセッサを使って、上位のレイヤにエラーを投げ返してあげます。
特定の処理で、個別の代替え処理が必要ないのなら、そこではon_failureは記載しなくて良いでしょう。

例としてこんなパイプラインを作ってみました。

# 最上位のパイプライン、個別解決できない場合の例外処理を実装。
# ここで、例外は吸収されるのでドキュメントインデックスの処理は正常に終わる。
# ここでもfailを使うかon_failureを実装しなければ、ドキュメントインデックスのリクエスト自体がエラーになる。
PUT _ingest/pipeline/PipelineA
{
  "processors": [{"pipeline": {"name": "PipelineB"}}],
  "on_failure": [
    {
      "set": {
        "field": "caught_at_A",
        "value": "{{ _ingest.on_failure_message }}"
      }}]}
# on_failureで例外処理を実行しつつ、上位にfailで継続不可を伝える例
PUT _ingest/pipeline/PipelineB
{
  "processors": [
    {
      "pipeline": {"name": "PipelineC"},
      "set": {"field": "B","value": 1}
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "caught_at_B",
        "value": "{{ _ingest.on_failure_message }}"
      }
    },
    {
      "fail": {
        "message": "B caught a failure"
      }}]}
# テストするにはfailプロセッサで正常ルートのとある箇所からエラーを発生させると便利
PUT _ingest/pipeline/PipelineC
{
  "processors": [
    {"set": {"field": "C","value": 1}},
    {
      "fail": {
        "message": "failed at C"
      }}]}
# ドキュメントインデックスして
PUT test_index/_doc/1?pipeline=PipelineA
{}

# 結果をみてみる
GET test_index/_doc/1

Cで発生したエラーをBで検知して、例外処理を行った後、さらにエラーを投げ直し、Aが別の例外処理を行っています。

{
    "C" : 1,
    "caught_at_B" : "failed at C",
    "caught_at_A" : "B caught a failure"
  }

参考になれば幸いです。

ご回答頂きありがとうございます。

on_failure 句の仕組みが大変わかりました。

参考にさせて頂きます。

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.