Filebeat,Logstash,Ingest node複数ログの疎通について

お世話になります。

現在、Filebeat(input logは複数)→Logstash(multiple pipelines)→Elasticsearch(Ingest node)の疎通を試しています。

バージョン…各7.3

◆データ(定義)遷移

【Filebeat】→【Logstash】→【Ingest node】

access.log→squid_access.conf→squid_translate
cache.log→squid_cache.conf→squid_cache

◆input log

access.log
1286536308.880 180 192.168.0.224 TCP_MISS/200 411 GET http://liveupdate.symantecliveupdate.com/minitri.flg - DIRECT/125.23.216.203 text/plain

cacle.log
Max Mem size: 262144 KB

◆設定値

filebeat.yml(各ログ毎にtags設定)

filebeat.inputs:

- type: log
tags: ["access"]
enabled: true
paths:
- /var/log/squid/access*.log

- type: log
tags: ["cache"]
enabled: true
paths:
- /var/log/*.log

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["XX.XXX.XX.XX:5044","XX.XXX.XXX.XXX5044"]

Logstash

pipelines.yml

- pipeline.id: squid_access
pipeline.batch.size: 125
path.config: "/etc/logstash/conf.d/squid_access.conf" 
pipeline.workers: 1

- pipeline.id: squid_cache
pipeline.batch.size: 125
path.config: "/etc/logstash/conf.d/squid_cache.conf" 
pipeline.workers: 1

squid_access.conf

input {
  beats {
    port => 5044
    tags => ["access"]
  }
}
filter{
   if "access" in [tags] {
    mutate {
        copy => { "message" => "split_message" }
    }
    mutate {
        split => ["split_message", " "]
        add_field => { "client_address" => "%{[split_message][2]}" }
        remove_field => ["split_message"]
     }
    translate {
        field => "client_address"
        destination => "blacklist"
        exact => true
        regex => true
        dictionary_path => "/var/tmp/data/translate.yml"
        fallback => "2"
     }
  }
}
output{
   if "access" in [tags] {
        elasticsearch{
           hosts => [ "XX.XXX.XXX.XXX:9200" ]
           user => "XXX"
          password => "XXX"
          pipeline => "squid_translate"
         index => "squid_translate"
        }
   }
}

squid_cache.conf

input {
   beats {
      port => 5044
      tags => ["cache"]
  }
}
filter {
   if "cache" in [tags] {
    }
  }
output{
   if "cache" in [tags] {
      elasticsearch{
      hosts => [ "10.129.189.148:9200" ]
      user => "XXX"
      password => "XXX"
      pipeline => "squid_cache"
      index => "squid_cache"
   }
  }
}

Ingest node

GET /_ingest/pipeline/squid_translate
{
  "squid_translate" : {
    "processors" : [
      {
        "grok" : {
          "field" : "message",
          "patterns" : [
            "%{NUMBER:timestamp}%{SPACE}%{NUMBER:duration} %{IP:client_address} %{WORD:cache_result}/%{POSINT:status_code} %{NUMBER:bytes} %{WORD:request_method} %{NOTSPACE:url} %{NOTSPACE:user} %{WORD:hierarchy_code}/%{NOTSPACE:server} %{NOTSPACE:content_type}"
          ]
        }
      }
    ]
  }
}

GET /_ingest/pipeline/squid_cache
{
  "squid_cache" : {
    "processors" : [
      {
        "kv" : {
          "field" : "message",
          "field_split" : "\n",
          "value_split" : ":"
        }
      }
    ]
  }
}

ご質問
上記設定値にて、Filebeatの/var/log/にcache.logを配置すると以下エラーが発生します。
エラーの原因が分かりましたら教えてください。
各アプリケーション間を「tags」で連携すれば良いのかと思いますが、記載方法が誤っていましたらご指摘頂けますと幸いです。

Logstash log

[INFO ][org.logstash.beats.Server] Starting server on port: 5044
[ERROR][logstash.javapipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:squid_access
Plugin: <LogStash::Inputs::Beats port=>5044, tags=>["access"], id=>"XXXX", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_XXX", enable_metric=>true, charset=>"UTF-8">, host=>"0.0.0.0", ssl=>false, add_hostname=>false, ssl_verify_mode=>"none", ssl_peer_metadata=>false, include_codec_tag=>true, ssl_handshake_timeout=>10000, tls_min_version=>1, tls_max_version=>1.2, cipher_suites=>["TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256"], client_inactivity_timeout=>60, executor_threads=>4>
Error: アドレスは既に使用中です
Exception: Java::JavaNet::BindException
Stack: sun.nio.ch.Net.bind0(Native Method)
sun.nio.ch.Net.bind(sun/nio/ch/Net.java:433)
(中略)
[INFO ][org.logstash.beats.Server] Starting server on port: 5044
[INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 500 ({"type"=>"exception", "reason"=>"java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: Provided Grok expressions do not match field value: [Maximum Resident Size: 0 KB]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"java.lang.IllegalArgumentException: Provided Grok expressions do not match field value: [Maximum Resident Size: 0 KB]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Provided Grok expressions do not match field value: [Maximum Resident Size: 0 KB]"}}, "header"=>{"processor_type"=>"grok"}})

Elasticsearch log

 [DEBUG][o.e.a.b.TransportBulkAction] [XXX] failed to execute pipeline [squid_translate2] for document [squid_translate2/_doc/null]
 org.elasticsearch.ElasticsearchException: java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: Provided Grok expressions do not match field value: [Maximum Resident Size: 0 KB]</strong>

提示されているログの中には2つのエラーが含まれているように思います。

アドレスは既に使用中です

Logstashで、squid_accessとsquid_cacheでパイプラインを分けて処理しようとされていますが、
両方のInputのところを見ますと、2つともbeatsでポート5044で待ち受けとなっています。

このため、ポートが競合しまい、エラーになっているかと思います。
そのせいで、2つ独立したLogstashのPipelineを期待しているところですが、実は1つしか動いていないという状態(今回はaccessを処理する方)になっているのでしょう。

Grok Processorのエラー

エラーメッセージの中では、squid_translate(tagでaccessとつけたものを処理したい方)に、cacheの方の文字列がきていることが分かります。

Maximum Resident Size: 0 KB

このため、期待するフォーマットと違うためにGrok処理でエラーになっているかと思います。
理由は上でも書いた通り、cacheのデータは、LogstashのCache用パイプラインに流れてほしいと思っているところ、access用のパイプラインに来てしまっているため、以降の処理が期待しない動作になってきていると考えます。

回避案1

https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html#multiple-pipeline-usage

このUsage Considerationを見ますと、InputやFilterで同じものがない分離されたものであれば、Multiple Pipelineが便利に使えるよとあります。
今回の場合 、InputはBeatsで同じところを見ている構成です。なので、Pipelineにせず1本化するとシンプルになるのではないかと思います。

具体的には・・・

input {
  beats {
    port => 5044
    # tags => ["access"] ← filebeat送信時にタグは既に追加されているため、この段階でさらに追加する必要はない
  }
}

filter部分についてはtagの中身で処理を分けてやります。
outputも同様に分けて処理すれば、Ingest Pipelineに渡したい中身と違うものが渡されることがなくなるかと思います。

filter{
   if "access" in [tags] {
     # 何か処理
   }
   if "cache" in [tags] {
     # 何か処理
   }
}

早急に回答頂きありがとうございます。

Multiple Pipelineでは、複数の処理で同じポートを指定するとエラーになるのですね。
認識不足でした。

提示頂いた回避案で解決しました。
大変助かりました。