Filebeat/Logstash output split messages into multiple with approximately a maximum field size of 8191 charactes

I am using Filebeat to collect logs output by docker to Logstash, to Elastic Search.
The data flow is from docker stdout -> Filebeat (autodiscovery) -> Logstash -> ES

Logstash docker.elastic.co/logstash/logstash:6.8.23
Filebeat docker.elastic.co/beats/filebeat:7.17.7

Similar to issue filebeats truncates log messages after 16k · Issue #6605 · elastic/beats · GitHub , messages with fields bigger than 8191 (approximately as I have just copied the characters in a truncated field and counted them) are split into multiple to stdout by docker.

To test I have started the solution on my local machine and I am looking at the logs from docker to filebeat and logstash and I see that indeed they are split up by docker. However the github issue suggest a work around was provided long time ago. But this does not seem to work for me.

Anything I can do at this stage?

logstash_1  | {"stream":"stderr","tags":["beats_input_codec_plain_applied","_grokparsefailure"],"log":{"offset":81392,"file":{"path":"/var/lib/docker/containers/efdf555f795a42198b91c1eee332e ............

The interesting part here is that it shows a message with offset and then these types of messages are indeed parts of the full message that logstash receives from Filebeat.

My configurations for Filebeat and Logstash are rather standards such as

Filebeat

filebeat.autodiscover:
  providers:
    - type: docker
      hints.enabled: true
      hints.default_config.enabled: false

output.logstash:
  hosts: ["logstash:5044"]

processors:
  # decode the log field (sub JSON document) if JSON encoded, then maps it's fields to elasticsearch fields
  - decode_json_fields:
      fields: ["log", "message"]
      target: ""
      # overwrite existing target elasticsearch fields while decoding json fields
      overwrite_keys: true

Logstash

input { 
  beats {
  	    port => 5044
  	    host => "0.0.0.0"
  }
}

filter {
  if "/ping" in [message] {
    drop { }
  }
  grok {
    match => [ "message", "%{MONOLOG} %{GREEDYDATA:mymessage}"]
    patterns_dir => ["/usr/share/logstash/patterns", "/usr/share/logstash/patterns_extra"]
    add_field => {
	  "type" => "monolog"
	}
  }
  if "_grokparsefailure" in [tags] {
	  grok{
	    match => { "message" => "%{DATESTAMP:timestamp} \[%{LOGLEVEL:log-level}\] \[(?<app>[A-Za-z0-9.\s]*?)\] %{GREEDYDATA:message}" }
	    patterns_dir => ["/usr/share/logstash/patterns", "/usr/share/logstash/patterns_extra"]
	    add_field => {
	      "type" => "yii"
	    }
	  }
  }
}
output {
	if ![@metadata][beat] {
		elasticsearch {
			hosts => ["${ES_HOST}"]
			index => "wiz_myapp"
		}

		stdout { codec => rubydebug }
    }
    else if "wizmyappprod" in [container][name] {
        elasticsearch {
            hosts => ["${ES_HOST}"]
            manage_template => false
            index => "wiz_myapp_prod-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
        }

        stdout { codec => json }
    }
    else if "wizmyappstaging" in [container][name] {
        elasticsearch {
            hosts => ["${ES_HOST}"]
            manage_template => false
            index => "wiz_myapp_staging-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
        }

        stdout { codec => json }
    }
    else if "wizmyappmirror" in [container][name] {
        elasticsearch {
            hosts => ["${ES_HOST}"]
            manage_template => false
            index => "wiz_myapp_mirror-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
        }

        stdout { codec => json }
    }
    else {
        elasticsearch {
            hosts => ["${ES_HOST}"]
            manage_template => false
            index => "wiz_myapp_other-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
        }

        stdout { codec => json }
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.