I got following situation:
Im sending logs from my servers to logstash using filebeat. Im running a logstash pipeline that looks like this:
input {
beats {
port => 5003
}
}
filter {
grok {
match => { "message" => "(?m)%{DATESTAMP:timestamp} %{INT:process_id} %{WORD:level} %{NOTSPACE:db_name} %{USERNAME:name}: %{GREEDYDATA:log_message}"}
}
date {
match => [ "timestamp", "yy-MM-dd HH:mm:ss,SSS" ]
timezone => "Etc/UTC"
target => "@timestamp"
}
mutate {
remove_field => [ "timestamp" ]
remove_field => [ "message" ]
}
if [log_message] =~ "^([0-9]{1,3}\.){3}[0-9]{1,3}" {
grok {
match => { "log_message" => "%{IPORHOST:request_clientip} %{USER:request_ident} %{USER:request_auth} \[%{GREEDYDATA:request_httpdate}\] \"(?:%{WORD:request_method} %{NOTSPACE:request_endpoint}(?: HTTP/%{NUMBER:request_httpversion})?|%{DATA:request_rawrequest})\" %{NUMBER:request_responsecode} (?:%{NUMBER:request_bytes}|-) %{GREEDYDATA:request_additionals}"}
}
date {
match => [ "request_httpdate", "dd/MMM/yyyy HH:mm:ss"]
target => "request_httpdate"
}
mutate {
add_field => ["type", "access"]
remove_field => [ "log_message" ]
rename => {
"request_clientip" => "[request][clientip]"
"request_ident" => "[request][ident]"
"request_auth" => "[request][auth]"
"request_httpdate" => "[request][httpdate]"
"request_method" => "[request][method]"
"request_endpoint" => "[request][endpoint]"
"request_httpversion" => "[request][httpversion]"
"request_rawrequest" => "[request][rawrequest]"
"request_responsecode" => "[request][responsecode]"
"request_bytes" => "[request][bytes]"
"request_additionals" => "[request][additionals]"
}
}
}
else {
mutate {
add_field => ["type", "log"]
}
}
mutate {
remove_field => ["host"]
}
}
output {
if [type] == "log" {
elasticsearch {
hosts => "localhost:9200"
data_stream => "true"
data_stream_type => "logs"
data_stream_dataset => "myapp"
data_stream_namespace => "log"
}
stdout { codec => rubydebug }
}
if [type] == "access" {
elasticsearch {
hosts => "localhost:9200"
data_stream => "true"
data_stream_type => "logs"
data_stream_dataset => "myapp"
data_stream_namespace => "access"
}
}
}
My logstash stdout looks like this (exactly how i want it):
{
"log" => {
"file" => {
"path" => "/var/log/myapp/applog.log"
},
"offset" => 74528864,
"flags" => [
[0] "multiline"
]
},
"ecs" => {
"version" => "1.8.0"
},
"@version" => "1",
"log_message" => "A \n very \n long \n multiline \n log",
"agent" => {
"type" => "filebeat",
"hostname" => "myhost",
"ephemeral_id" => "some id",
"version" => "7.13.2",
"name" => "myhost",
"id" => "another id"
},
"process_id" => "20104",
"input" => {
"type" => "log"
},
"level" => "TEST",
"type" => "log",
"db_name" => "my db name",
"@timestamp" => 2021-05-20T06:02:18.478Z,
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"name" => "some field"
}
Up to this point everything works fine. The document is also written correctly to elasticsearch with the exception of the field "log_message", which is always empty with multiline logs:
As far as I can tell only multiline logs are affected. I also think the filebeat and logstash part works as I intended as the stdout returns the expected result.
It might be worth noting im working with ES data streams as an output for logstash.
Any ideas? Thanks in advance.