Ingest node GrokにてArraylistの値を受け取りエラー

LogstashからElasticsearch(Ingest node)の連携について、ご質問させて頂きます。

実現したいこと

・Logstashでtanslateを実施(前段階としてmutateのsplit,add_fieldを使い特定の値をfieldにセット)
・Elasticsearch Ingest nodeにて全体値をGrok

Logstash config

input {
  beats {
    port => XXXX
  }
}
filter {
    mutate {
        split => ["message", " "]
        add_field => { "client_address" => "%{[message][2]}" }
    }
}
    translate {
        field => "client_address"
        destination => "blacklist"
        exact => true
        regex => true
        dictionary_path => "/var/tmp/data/translate.yml"
        fallback => "2"
  }
output {    
    elasticsearch{
        hosts => [ "XX.XXX.XXX.XXX:XXXX" ]
        user => "XXXXX"
        password => "XXXXX"
       pipeline => "squid_translate"
    }
}

Elasticsearch Ingest node

PUT /_ingest/pipeline/squid_translate
{
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{NUMBER:timestamp}%{SPACE}%{NUMBER:duration} %{WORD:cache_result}/{POSINT:status_code} %{WORD:request_method} %{NOTSPACE:url} %{NOTSPACE:user} %{WORD:hierarchy_code}/{NOTSPACE:server} %{NOTSPACE:content_type}"
        ]
      }
    }
  ]
}

上記を元に処理を実行すると、Elasticsearchにて以下のエラーが発生します。
spritにてmessageがarraylist型になったが、Ingest nodeのGrokがString型にて受け取ろうとしたため、型違いでエラーになったようです。

[2019-09-25T11:30:35,562][DEBUG][o.e.a.b.TransportBulkAction]  failed to execute pipeline [squid_translate] for document [squid_translate/_doc/null]
org.elasticsearch.ElasticsearchException: java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [message] of type [java.util.ArrayList] cannot be cast to [java.lang.String]
at org.elasticsearch.ingest.CompoundProcessor.newCompoundProcessorException(CompoundProcessor.java:194) ~[elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:133) ~[elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:100) ~[elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.ingest.IngestService.innerExecute(IngestService.java:427) ~[elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.ingest.IngestService.access$100(IngestService.java:70) ~[elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.ingest.IngestService$3.doRun(IngestService.java:355) [elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:758) [elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.3.2.jar:7.3.2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:835) [?:?]
Caused by: java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [message] of type [java.util.ArrayList] cannot be cast to [java.lang.String]
... 11 more
Caused by: java.lang.IllegalArgumentException: field [message] of type [java.util.ArrayList] cannot be cast to [java.lang.String]
at org.elasticsearch.ingest.IngestDocument.cast(IngestDocument.java:550) ~[elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.ingest.IngestDocument.getFieldValue(IngestDocument.java:116) ~[elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.ingest.IngestDocument.getFieldValue(IngestDocument.java:131) ~[elasticsearch-7.3.2.jar:7.3.2]
at org.elasticsearch.ingest.common.GrokProcessor.execute(GrokProcessor.java:58) ~[?:?]
at org.elasticsearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:123) ~[elasticsearch-7.3.2.jar:7.3.2]
... 9 more

ご質問
上記エラーを解決する方法が分かりましたら教えてください。

messageが配列になってしまうことが問題でしたら、mutateでsplitする前に、フィールドを退避しておくことも考えられますね。

filter {
    mutate {
        # messageをsplit用のフィールドにコピーする。messageはオリジナルの値のまま
        copy => { "message" => "split_message" }
    }
    # あとはコピーしたフィールドに対してsplit処理を実施
    mutate {
        split => ["split_message", " "]
        add_field => { "client_address" => "%{[split_message][2]}" }
        remove_field => ["split_message"]
    }
}

ただ単純な疑問としては、LogstashのfilterとIngestのPipelineを併用する意図はなんだろう?と気になります。
Ingest Pipelineの方に1本化することも出来ると思います。

早急に回答頂きありがとうございます。

messageが配列になってしまうことが問題でしたら、mutateでsplitする前に、フィールドを退避しておくことも考えられますね。
→上記方法で解決しました。大変助かりました。

ただ単純な疑問としては、LogstashのfilterとIngestのPipelineを併用する意図はなんだろう?と気になります。
Ingest Pipelineの方に1本化することも出来ると思います。
→おっしゃる通り、本来だとIngest nodeにてtranslateとGrokの処理を行いたいのですが、
Ingest nodeにtranslateの機能は存在しないと認識しております。
そのため、Logstashにてtranslate処理の部分は行いIngest nodeでGrok処理を行っています。

追加のご質問
Ingest nodeにtranslateの機能が存在するということであれば、ぜひ教えて頂きたいです。

なるほど。translateのような機能は現時点で提供されていないようですので、Logstashと併用される意図がよくわかりました。

両者の使い分け?については、日本語でもブログがあがっていたのでご紹介まで。

返信が遅くなり申し訳ありません。

ブログをご紹介頂きありがとうございます。参照させて頂きます。

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.