Hello Elastic Community,
I am currently working on configuring Logstash to correctly process error stacktraces from Kubernetes logs. My goal is to ensure that stacktraces are combined into a single event before further processing. Despite trying several configurations, I am still seeing stacktraces being split line by line.
Here is the current Logstash configuration that we have been trying to make work :
input {
tcp {
port => 5050
codec => json_lines
#{ delimiter => "\t" }
}
# codec => multiline {
# pattern => "^\s"
# what => "previous"
# }
}
filter {
mutate { rename => {"[log]" => "[message]"} }
mutate { rename => {"[kubernetes][labels][app]" => "[kubernetes][labels][fluent]"} }
mutate { remove_field => [ "[kubernetes.labels.app]" ] }
grok {
break_on_match => true
match => { "[kubernetes][host]" => "%{DATA:site}-%{WORD:env}-.*"}
add_field => { "cluster" => "%{site}-%{env}"}
}
grok {
break_on_match => true
match => { "message" => "%{TIMESTAMP_ISO8601} %{DATA:log_level} \[%{DATA:ms},%{DATA:traceid},.*\] .*\[requesterId=%{GREEDYDATA:requesterid}, clientId=%{GREEDYDATA:clientid}, .*, apiKey=%{GREEDYDATA:apikey}\]" }
# match => { "log" => "%{TIMESTAMP_ISO8601} %{DATA:log_level}%{SPACE}\[%{DATA:ms},%{DATA:traceid},.*\] 1 " }
# match => { "log" => "%{TIMESTAMP_ISO8601} %{DATA:log_level}%{SPACE}\[%{DATA:ms},%{DATA:traceid},.*\] 1 " }
match => { "message" => "%{TIMESTAMP_ISO8601} %{DATA:log_level} \[%{DATA:ms},%{DATA:traceid},.*\]" }
match => { "message" => "%{TIMESTAMP_ISO8601} %{DATA:log_level} \[%{DATA:ms},%{DATA:traceid},.*\]" }
match => { "message" => "%{TIMESTAMP_ISO8601} %{DATA:log_level} [,,] 1 ---" }
match => { "message" => "%{TIMESTAMP_ISO8601} %{DATA:log_level} [,,] 1 ---" }
match => { "message" => "%{TIMESTAMP_ISO8601} %{DATA:log_level} 1" }
match => { "message" => "%{TIMESTAMP_ISO8601} %{DATA:log_level} 1" }
# match => { "log" => "%{TIMESTAMP_ISO8601} %{DATA:log_level} 1 ---" }
# match => { "log" => "%{TIMESTAMP_ISO8601} %{DATA:log_level} 1 ---" }
}
# This was added because we can't get the whole stacktrace in one line :(
if (("****" in [kubernetes][namespace_name]) and (![log_level])) {
mutate { add_field => { "log_level" => "ERROR" } }
}
}
output {
# if "****" in [cluster] {
elasticsearch {
hosts => ["****", "****", "*****"]
ssl => true
cacert => ["*****"]
index => "k8s-****-%{[@metadata][version]}-%{+yyyy.MM.dd}"
api_key => "*****"
ilm_rollover_alias => "*****t"
ilm_policy => "******"
}
# }
}
But nothing seem to be working. The stacktraces still appear as multiple events instead of a single combined event.
Could someone please help me identify what I might be doing wrong? Are there any specific configurations or patterns I should be using for stacktraces?
Logs are being sent from kubernetes to logstash/elk via fluent-bit
Thank you in advance for your help!