Unable to Parse Meraki Flow Syslog Message with GROK and KV Filters in Logstash

Hello, I am having a lot of trouble with trying to parse a Cisco Meraki flow syslog message in Logstash.

A sample message looks like this:

Apr 25 13:12:41 gateway 1619356361.230223408 ip_flow_end src=10.1.130.45 dst=38.142.217.28 protocol=tcp sport=43640 dport=5671 translated_src_ip=199.119.119.6 translated_port=43640

I noticed the = in the log message, so I figured that I should use a kv filter to parse this. Also, I have a Mutate filter that adds tags to the different type of syslog messages that are received. This particular log message has a tag called mx84.

Here is my Logstash filter:

filter {
            if "mx84" in [tags] {
                    grok {
                            match => {
                                    "message" => "%{GREEDYDATA:msg}"
                            }
                    }

                    kv {
                            source => "msg"
                            value_split => "="
                           trim => "\s"
                           include_keys => [ "src","dst","protocol","sport","dport","translated_src_ip","translated_po$
                           target => "kv"
                    }
            }
    }

When I deploy the Logstash Pipeline and paste the log, this is the output that I receive:

 "type" => "syslog",
        "syslog_severity_code" => 5,
             "syslog_severity" => "notice",
                        "host" => "server1",
        "syslog_facility_code" => 1,
                   "logsource" => "gateway",
                  "@timestamp" => 2021-04-25T13:12:41.000Z,
             "syslog_facility" => "user-level",
                        "tags" => [
        [0] "mx84"
    ],
                    "@version" => "1",
                     "message" => " 1619356361.230223408 ip_flow_end src=1.2.33.444 dst=22.222.222.22 protocol=tcp sport=43640 dport=5671 translated_src_ip=333.333.333.3 translated_port=43640",
    "logstash_received_log_at" => "2021-04-27T21:59:38.435Z"

Goal: Parse "src", "dst", "protocol", "sport", "dport", "translated_src_ip", and "translated_port" into their own fields.

Also, for extra context, someone else created a Logstash filter to parse Syslog headers. They added the logstash_received_log_at field. This is the filter for Syslog header parsing:

filter {
        if [type] == "syslog" {
                # Parse syslog header from the log
                syslog_pri { }

                # Get additional syslog header metadata
                grok {
                        match => { "message" => [ "%{SYSLOGTIMESTAMP:timestamp} %{NOTSPACE:logsource} (%{SYSLOGPROG}: )?%{GREEDYDATA:message}"
                                                ]
                                         }
                        overwrite => [ "message" ]
                        add_field => [ "logstash_received_log_at", "%{@timestamp}" ]
                        # add_field => [ "generated_at", "%{timestamp}" ]
                        # add_field => [ "received_from", "%{host}" ]
                }

                date {
                        match => [ "timestamp", "MMM d HH:mm:ss", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
                        timezone => "UTC"
                }

                if "_dateparsefailure" not in [tags] {
                        mutate {
                                # replace => { "@timestamp" => "%{timestamp}" }
                                remove_field => [ "timestamp" ]
                        }

I have looked at the elastic.co documentation on grok and kv filters, but I am still stumped. Any help would be greatly appreciated!

I would use dissect, not grok.

    dissect { mapping => { "message" => "%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} %{computerName} %{anotherTimestamp} %{[@metadata][restOfLine]}" } }
    kv { source => "[@metadata][restOfLine]" }
    date { match => [ "anotherTimestamp", "UNIX" ] target => "anotherTimestamp" }
    date { match => [ "[@metadata][ts]", "MMM dd HH:mm:ss" ] }

This will get you events like

 "anotherTimestamp" => 2021-04-25T13:12:41.230Z,
              "dst" => "38.142.217.28",
         "protocol" => "tcp",
  "translated_port" => "43640",
     "computerName" => "gateway",
"translated_src_ip" => "199.119.119.6",
              "src" => "10.1.130.45",
            "sport" => "43640",
            "dport" => "5671"

If your dates do not have a year in them then logstash will guess, and sometimes it will guess wrong. See issues 137, 100, and the long discussion of 51.

If you want to use the "anotherTimestamp" for @timestamp then you could use

    dissect { mapping => { "message" => "%{} %{} %{} %{computerName} %{[@metadata][ts]} %{[@metadata][restOfLine]}" } }
    kv { source => "[@metadata][restOfLine]" }
    date { match => [ "[@metadata][ts]", "UNIX" ] }

So ur include_keys line is missing the closing "] but I suspect u need to escape the \s to be \\s as that's what's needed for the filebeat and elasticsearch processors, not 100% if the logstash syntax needs it. U could also look at the filebeat cisco meraki module that parses everything for u.

Thank you so much, Badger!!!! This worked wonderfully! If I could, I would send you a bottle of wine. :champagne:

Ah, that makes sense! I will definitely include this in my notes because I'm still learning about logstash filters, so this will be very valuable.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.