Hello, I am having a lot of trouble with trying to parse a Cisco Meraki flow syslog message in Logstash.
A sample message looks like this:
Apr 25 13:12:41 gateway 1619356361.230223408 ip_flow_end src=10.1.130.45 dst=38.142.217.28 protocol=tcp sport=43640 dport=5671 translated_src_ip=199.119.119.6 translated_port=43640
I noticed the = in the log message, so I figured that I should use a kv filter to parse this. Also, I have a Mutate filter that adds tags to the different type of syslog messages that are received. This particular log message has a tag called mx84.
Here is my Logstash filter:
filter {
if "mx84" in [tags] {
grok {
match => {
"message" => "%{GREEDYDATA:msg}"
}
}
kv {
source => "msg"
value_split => "="
trim => "\s"
include_keys => [ "src","dst","protocol","sport","dport","translated_src_ip","translated_po$
target => "kv"
}
}
}
When I deploy the Logstash Pipeline and paste the log, this is the output that I receive:
"type" => "syslog",
"syslog_severity_code" => 5,
"syslog_severity" => "notice",
"host" => "server1",
"syslog_facility_code" => 1,
"logsource" => "gateway",
"@timestamp" => 2021-04-25T13:12:41.000Z,
"syslog_facility" => "user-level",
"tags" => [
[0] "mx84"
],
"@version" => "1",
"message" => " 1619356361.230223408 ip_flow_end src=1.2.33.444 dst=22.222.222.22 protocol=tcp sport=43640 dport=5671 translated_src_ip=333.333.333.3 translated_port=43640",
"logstash_received_log_at" => "2021-04-27T21:59:38.435Z"
Goal: Parse "src", "dst", "protocol", "sport", "dport", "translated_src_ip", and "translated_port" into their own fields.
Also, for extra context, someone else created a Logstash filter to parse Syslog headers. They added the logstash_received_log_at field. This is the filter for Syslog header parsing:
filter {
if [type] == "syslog" {
# Parse syslog header from the log
syslog_pri { }
# Get additional syslog header metadata
grok {
match => { "message" => [ "%{SYSLOGTIMESTAMP:timestamp} %{NOTSPACE:logsource} (%{SYSLOGPROG}: )?%{GREEDYDATA:message}"
]
}
overwrite => [ "message" ]
add_field => [ "logstash_received_log_at", "%{@timestamp}" ]
# add_field => [ "generated_at", "%{timestamp}" ]
# add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "timestamp", "MMM d HH:mm:ss", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
timezone => "UTC"
}
if "_dateparsefailure" not in [tags] {
mutate {
# replace => { "@timestamp" => "%{timestamp}" }
remove_field => [ "timestamp" ]
}
I have looked at the elastic.co documentation on grok and kv filters, but I am still stumped. Any help would be greatly appreciated!