I am using Filebeat to collect logs output by docker to Logstash, to Elastic Search.
The data flow is from docker stdout -> Filebeat (autodiscovery) -> Logstash -> ES
Logstash docker.elastic.co/logstash/logstash:6.8.23
Filebeat docker.elastic.co/beats/filebeat:7.17.7
Similar to issue filebeats truncates log messages after 16k · Issue #6605 · elastic/beats · GitHub , messages with fields bigger than 8191 (approximately as I have just copied the characters in a truncated field and counted them) are split into multiple to stdout by docker.
To test I have started the solution on my local machine and I am looking at the logs from docker to filebeat and logstash and I see that indeed they are split up by docker. However the github issue suggest a work around was provided long time ago. But this does not seem to work for me.
Anything I can do at this stage?
logstash_1 | {"stream":"stderr","tags":["beats_input_codec_plain_applied","_grokparsefailure"],"log":{"offset":81392,"file":{"path":"/var/lib/docker/containers/efdf555f795a42198b91c1eee332e ............
The interesting part here is that it shows a message with offset and then these types of messages are indeed parts of the full message that logstash receives from Filebeat.
My configurations for Filebeat and Logstash are rather standards such as
Filebeat
filebeat.autodiscover:
providers:
- type: docker
hints.enabled: true
hints.default_config.enabled: false
output.logstash:
hosts: ["logstash:5044"]
processors:
# decode the log field (sub JSON document) if JSON encoded, then maps it's fields to elasticsearch fields
- decode_json_fields:
fields: ["log", "message"]
target: ""
# overwrite existing target elasticsearch fields while decoding json fields
overwrite_keys: true
Logstash
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
if "/ping" in [message] {
drop { }
}
grok {
match => [ "message", "%{MONOLOG} %{GREEDYDATA:mymessage}"]
patterns_dir => ["/usr/share/logstash/patterns", "/usr/share/logstash/patterns_extra"]
add_field => {
"type" => "monolog"
}
}
if "_grokparsefailure" in [tags] {
grok{
match => { "message" => "%{DATESTAMP:timestamp} \[%{LOGLEVEL:log-level}\] \[(?<app>[A-Za-z0-9.\s]*?)\] %{GREEDYDATA:message}" }
patterns_dir => ["/usr/share/logstash/patterns", "/usr/share/logstash/patterns_extra"]
add_field => {
"type" => "yii"
}
}
}
}
output {
if ![@metadata][beat] {
elasticsearch {
hosts => ["${ES_HOST}"]
index => "wiz_myapp"
}
stdout { codec => rubydebug }
}
else if "wizmyappprod" in [container][name] {
elasticsearch {
hosts => ["${ES_HOST}"]
manage_template => false
index => "wiz_myapp_prod-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
}
stdout { codec => json }
}
else if "wizmyappstaging" in [container][name] {
elasticsearch {
hosts => ["${ES_HOST}"]
manage_template => false
index => "wiz_myapp_staging-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
}
stdout { codec => json }
}
else if "wizmyappmirror" in [container][name] {
elasticsearch {
hosts => ["${ES_HOST}"]
manage_template => false
index => "wiz_myapp_mirror-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
}
stdout { codec => json }
}
else {
elasticsearch {
hosts => ["${ES_HOST}"]
manage_template => false
index => "wiz_myapp_other-%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM}"
}
stdout { codec => json }
}
}