Hello,
I am in the process of learning logstash for parsing particular logs, and for some reason not all of my logs are parsed as they should. I would be very grateful for the help with getting what is done wrong.
With a code like this:
input {
tcp {
host => "..."
port => 5001
type => "syslog"
}
udp {
host => "..."
port => 5001
type => "syslog"
}
}
filter {
if [type] == "syslog" {
grok {
break_on_match => false
match => [ "message" , "<%{NONNEGINT:syslog_pri}> %{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\]) ?: %{GREEDYDATA:syslog_message}"]
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
}
syslog_pri { }
if "_grokparsefailure" in [tags] {
mutate {
add_tag => "cannot_be_parsed_by_main_filter"
}
}
mutate {
rename => { "syslog_message" => "message" }
}
date {
match => ["syslog_timestamp", "ISO8601","MMM d HH:mm:ss","MMM d HH:mm:ss"]
remove_field => ["syslog_timestamp"]
}
}
filter {
if "cannot_be_parsed_by_main_filter" in [tags] {
grok{
remove_tag => ["_grokparsefailure"]
match => {"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program} ?: %{GREEDYDATA:syslog_message}"}
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
}
if "_grokparsefailure" in [tags] {
mutate {
remove_tag => ["cannot_be_parsed_by_main_filter"]
remove_tag => ["_grokparsefailure"]
add_tag => ["cannot_be_parsed_by_added_filter_1"]
}
}
mutate {
rename => { "syslog_message" => "message" }
}
}
filter {
if "cannot_be_parsed_by_added_filter_1" in [tags] {
grok {
remove_tag => ["cannot_be_parsed_by_added_filter_1"]
match => {"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}\(/etc/cron\.hourly\)\[%{POSINT:syslog_pid}] ?: %{GREEDYDATA:syslog_message}"}
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
}
if "_grokparsefailure" in [tags] {
mutate {
remove_tag => ["cannot_be_parsed_by_added_filter_1"]
remove_tag => ["_grokparsefailure"]
add_tag => ["cannot_be_parsed_by_added_filter_2"]
}
}
mutate {
rename => { "syslog_message" => "message" }
}
}
output {
elasticsearch {
hosts => [ "..." ]
index => 'logstash-%{+YYYY.MM.dd}'
}
}
Second additional filter doesn't work and most logs are not parsed my main one for some reason.
I also get only user and notice in facility and severity.
Log samples:
1). For some reason those logs aren't parsed by the main one, but with the first additional one:
Apr 25 20:11:46 nkdk22 slurmd[1234]: Launching batch job 3540 for UID 12345
Apr 25 20:15:12 head-testing xcat[12345]: xCAT: Allowing lsdef -t osimage for admin from localhost
2). Logs like this get parsed sometimes by the main filter normally, and sometimes by the first additional filter, and I don't get why or how the difference is made within the program:
Apr 25 20:19:25 n22p123 OpenSM[1234]: Errors during initialization
3). Logs like this don't get parsed by second additional filter. I'm not sure that second additional filter is reachable, but I don't know the reason:
Apr 25 15:20:31 nkdk12 run-parts(/etc/cron.hourly)[1234 finished mcelog.cron
Thank you for your help!