Ingest: foreachとgrokを組み合わせた場合の処理について

知りたいこと

Ingest Pipelineでforeachの中でgrokを適用し、結果をarrayとして保存する方法について

状況の説明

括弧内にログのコード、次にログのメッセージからなる文字列が、カンマ区切りで出力されているテキストがあります。

(例)

[INFO.0001]あいうえおかきくけこ, [INFO.0002]さしすせそ

これを、Ingest Pipelineでこのようにパースしたいと考えています。

期待値

{
  "hoge": ["INFO.0001", "INFO.0002"],
  "fuga": ["あいうえおかきくけこ", "さしすせそ"]
}

try

まず、splitで配列にし、foreach内でgrokによりコードとメッセージを取るように指定することを考えました。
これだと、hoge, fugaのフィールドに対して最後のループのときの値がセットされるだけです。

 {
      "split": {
        "field": "message",
        "separator": ","
      }
    },
    {
      "foreach": {
        "field": "message",
        "processor" : {
          "grok": {
            "field": "_ingest._value",
            "patterns": ["\\[%{NOTSPACE:hoge}\\]%{GREEDYDATA:fuga}"]
          }
        }
      } 
    }

actual result

{
  "hoge": "INFO.0002",
  "fuga": "さしすせそ"
}

try2

また、値の上書きとならないよう、foreachの中でgrokの次にappendを指定した場合には、

      "foreach": {
        "field": "biz.temp.exception",
        "processor" : {
          "grok": {
            "field": "_ingest._value",
            "patterns": ["\\[%{NOTSPACE:hoge}\\]%{GREEDYDATA:fuga}"]
          },
          "append": {
            "field": "hoge_array",
            "value": "%{hoge}"
          }
        }

result

"type":"parse_exception","reason":"[processor] Must specify exactly one processor type","hea
der":{"processor_type":"foreach","property_name":"processor"}}

このようなエラーとなり、foreachのprocessorは1つしか指定してはいけないと返されます。

調べたこと

同じようなトピックが既にありましたが回答がついておらず、自動クローズされていました。

foreach自身は複数書けることが分かりましたが、grokとの組み合わせには言及されていませんでした。

試している環境

ES, Kibana, 6.0.0-beta2

残念ながら、ぱっと見では、Grokでの対応はできなそうかと。
ただし、LogstashのGrokの場合はまた状況が違いそうです。

後は、Script processorでPainlessで処理する形が妥当な気がします。

アドバイスありがとうございます。

Script processorで試したところ、期待する動作を確認できました。
simulateの結果を貼っておきます。

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "split": {
          "field": "message",
          "separator": ","
        },
        "script": {
          "lang": "painless",
          "source": """ctx.logcode = []; ctx.logmessage = []; for (line in ctx.message) { def matcher = /\[(.*?)\](.*)/.matcher(line); if (matcher.find()) { ctx.logcode.add(matcher.group(1).trim()); ctx.logmessage.add(matcher.group(2).trim());}}"""
        },
        "remove": {
          "field": "message"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "[INFO.0001]あいうえおかきくけこ, [INFO.0002]さしすせそ"
      }
    }
  ]
}

結果

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_type": "_type",
        "_id": "_id",
        "_source": {
          "logmessage": [
            "あいうえおかきくけこ",
            "さしすせそ"
          ],
          "logcode": [
            "INFO.0001",
            "INFO.0002"
          ]
        },
        "_ingest": {
          "timestamp": "2017-10-03T00:38:19.594Z"
        }
      }
    }
  ]
}
  • 動作確認環境

    ES, Kibana 6.0.0-RC1

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.