harue
(harue)
January 27, 2020, 2:25am
1
お世話になっております。
Ingest node Pipelineのon_failureについて質問させて頂きます。
下記のように
PipelineAからPipelineB、PipelineBからPipelineCを呼び出す処理を実施しています。
取込ファイルは、S3→Filebeat→Elasticsearch(ingest node)の順でindex登録しています。
PUT /_ingest/pipeline/PipelineA
{
"processors": [
{
"pipeline": {
"if": "ctx.aws.s3.bucket.name == 'aaa'",
"name": "PipelineB"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed-{{ _index }}"
}
}
]
}
PUT _ingest/pipeline/PipelineB
{
"processors": [
{
"pipeline": {
"name": "PipelineC"
},
"date_index_name": {
"if": "ctx.aws.s3['object.key'].startsWith('aaa/bbb')",
"field": "@timestamp",
"index_name_prefix": "test001",
"date_rounding": "M"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed-{{ _index }}"
}
}
]
}
PUT _ingest/pipeline/PipelineC
{
"processors": [
{
"dissect": { (省略) }
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed-{{ _index }}"
}
}
]
}
以下事象の場合、
on_failure句処理を実施し、"failed-{{ _index }}"が作成されるはずがindexが作成されません。
・PipelineCのパイプラインを削除後、S3にファイルをアップロード
→PipelineBにて呼び出し先が不明でエラーとなるはず
・空データのファイルをS3にアップロード
→PipelineCのdissect processorsにてエラーとなるはず
なおPipelineのsimulateにて上記を実施した場合は、
index名が"failed-{{ _index }}"に更新されエラーを発生させることが出来ました。
原因として考えられるものについて、分かりましたらご教示をお願い致します。
tsgkdt
(tsgkdt)
January 27, 2020, 3:54am
2
事象をシンプルにするため、最低限のデータの準備と、登録はKibanaのDevToolで行いましたが、
PipelincCを削除後、データを投入したところSimulateで実行した場合と同じ結果になりました。
Filebeat→Elasticsearchのところは、本質的には、Kibana DevToolsやcurlを用いてPOST hoge/_doc/1
のようにデータを投入するのと変わらないはずですので、
簡易な手段で再現できる方法がありますと、ヒントが見つかるかもしれません。
こちらでは、7.5.0で確認しています。以下、確認したときに使用した内容です。
PUT /_ingest/pipeline/PipelineA
{
"processors": [
{
"pipeline": {
"if": "ctx.aws.s3.bucket.name == 'aaa'",
"name": "PipelineB"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed-{{ _index }}"
}
}
]
}
PUT _ingest/pipeline/PipelineB
{
"processors": [
{
"pipeline": {
"name": "PipelineC"
},
"date_index_name": {
"if": "ctx.aws.s3['object.key'].startsWith('aaa/bbb')",
"field": "@timestamp",
"index_name_prefix": "test001",
"date_rounding": "M"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed_b-{{ _index }}"
}
}
]
}
DELETE _ingest/pipeline/PipelineC
PUT _ingest/pipeline/PipelineC
{
"processors": [
{
"dissect": {
"field": "message",
"pattern" : "%{test-a} %{test-b} %{test-c} %{test-d} %{test-e}"
}
}
],
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed_c-{{ _index }}"
}
},
{
"set": {
"field": "result-c",
"value": "failed_c-{{ _index }}"
}
}
]
}
simulate
POST _ingest/pipeline/PipelineA/_simulate
{
"docs": [
{
"_index": "test001-2020.01.27",
"_id": "id",
"_source":
{
"@timestamp": "2016-04-25T12:02:01.357Z",
"message": "this is a test .",
"aws": {
"s3": {
"bucket": {
"name": "aaa"
},
"object.key": "aaa/bbb"
}
}
}
}
]
}
データ投入
POST forum0127/_doc/1?pipeline=PipelineA
{
"@timestamp": "2016-04-25T12:02:01.357Z",
"message": "this is a test .",
"aws": {
"s3": {
"bucket": {
"name": "aaa"
},
"object.key": "aaa/bbb"
}
}
}
harue
(harue)
January 30, 2020, 4:50am
3
返信が遅くなり申し訳ありません。
回答頂きありがとうございます。
Kibana DevToolsでPOSTでデータを投入、filebeatログにてElasticsearchからの戻り値を確認したところ原因が分かりました。
原因としては、PipelineBのon_failure処理でindex名を更新する箇所でエラーになっていました。
大変参考になりました。
system
(system)
Closed
February 27, 2020, 4:50am
4
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.