Configure logstash to show logs from port 5002

PFSense logs have a fixed header followed by a variety of CSV entries. (Format documented here.) I would take off the fixed header using dissect, not grok (the tooling in logstash has improved a lot in the last 6 years)

    dissect { mapping => { "message" => "<%{syslog_pri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} filterlog: %{restOfLine}" } }
    date { match => [ "[@metadata][ts]", "MMM dd HH:mm:ss" ] }

If your dates do not have a year in them then logstash will guess, and sometimes it will guess wrong. See issues 137, 100, and the long discussion of 51.

If you literally just want to extract the IPs then you could do it using grok

    grok { match => { "restOfLine" => "%{IP:srcIP},%{IP:dstIP}" } }

If you want to do the job properly, then you need to pull out a set of fields, label them, and determine what the next set of fields are. My initial approach to this was to use dissect again and I got to this point, and realized I had far to go

    # dissect will not parse ",," using ",%{fieldName},", so replace it with something we can later delete
    mutate { gsub => [ "restOfLine", ",,", ",!!!," ] }

    dissect { mapping => { "restOfLine" =>  "%{rule},%{subRule},%{anchor},%{tracker},%{interface},%{reason},%{action},%{direction},%{ipVersion},%{restOfLine}" } }
    if "_dissectfailure" not in [tags] {
        if [ipVersion] == "4" {
            dissect { mapping => { "restOfLine" =>  "%{tos},%{ecn},%{ttl},%{id},%{offset},%{flags},%{protocol},%{protocolName},%{restOfLine}" } }
        } else if [ipVersion] == "6" {
            # Never seen one of thse, "protocol" be the name, in which case protocol and protocolId should change to be like V4
            dissect { mapping => { "restOfLine" =>  "%{class},%{flowLabel},%{hopLimit},%{protocol},%{protocolId},%{restOfLine}" } }
        } else {
            mutate { add_field => { "parseFailure" => "unknown IP version" } }
        }

        if [ipVersion] in [ "4", "6" ] {
            dissect { mapping => { "restOfLine" =>  "%{length},%{srcIp},%{dstIp},%{restOfLine}" } }
            if [protocol] in [ "6", "17" ] {
                # TCP and UDP
                dissect { mapping => { "restOfLine" =>  "%{srcPort},%{dstPort},%{dataLength},%{restOfLine}" } }
            } else if [protocol] == "112" {
                # CARP
                dissect { mapping => { "restOfLine" =>  "%{type},%{carpTtl},%{vhid},%{carpVersion},%{advskew},%{advbase},%{restOfLine}" } }
            }
            if [protocol] == "6" {
                dissect { mapping => { "restOfLine" =>  "%{tcpFlags},%{tcpSeq},%{tcpAck},%{tcpWindow},%{tcpUrg},%{tcpOptions}" } }
            } else if [protocol] == "1" {
                # ICMP
                dissect { mapping => { "restOfLine" =>  "%{icmpType}},%{restOfLine}" } }
                # Lots more code to dissect each of these ...
            }
        }
    }

Note that that code is incomplete and fails to parse UDP correctly because 'if [protocol] in [ "6", "17" ] {' needs to be two separate branches...

I came up with a different solution... Create a ruby script file that contains

def register(params)
    @keys = params['keys']
    @values = params['values']
end

def filter(event)
  values = event.get(@values)
  @keys.each { |k|
    event.set(k, values.shift)
  }
  event.set(@values, values)
[event]
end

You call that using a ruby filter like

        ruby {
            path => "/home/user/addFields.rb"
            script_params => {
                "values" => "restOfLine"
                "keys" => [ "tos", "ecn", "ttl", "id", "offset", "flags", "protocol", "protocolName" ]
            }
        }

That assumes that restOfLine is an array that contains values. It will remove entries from the array for each of the keys and add them to the event.

Using that like this

    dissect { mapping => { "message" => "<%{syslog_pri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} filterlog: %{restOfLine}" } }
    date { match => [ "[@metadata][ts]", "MMM dd HH:mm:ss" ] }

    mutate { split => { "restOfLine" => "," } }

    ruby {
        path => "/home/user/addFields.rb"
        script_params => {
            "values" => "restOfLine"
            "keys" => [ "rule", "subrule", "anchor", "tracker", "interface", "reason", "action", "direction", "ipVersion" ]
        }
    }

    if [ipVersion] == "4" {
        ruby {
            path => "/home/user/addFields.rb"
            script_params => {
                "values" => "restOfLine"
                "keys" => [ "tos", "ecn", "ttl", "id", "offset", "flags", "protocol", "protocolName" ]
            }
        }
    } else if [ipVersion] == "6" {
        # Never seen one of these, "protocol" could be the name, in which case protocol and protocolId should change to be like V4
        ruby {
            path => "/home/user/addFields.rb"
            script_params => {
                "values" => "restOfLine"
                "keys" => [ "class", "flowLabel", "hopLimit", "protocol", "protocolId" ]
            }
        }
    } else {
        mutate { add_field => { "parseFailure" => "unknown IP version" } }
    }

    if [ipVersion] in [ "4", "6" ] {
        ruby {
            path => "/home/user/addFields.rb"
            script_params => {
                "values" => "restOfLine"
                "keys" => [ "length", "srcIp", "dstIp" ]
            }
        }

        if [protocol] == "1" {
            # ICMP
            ruby {
                path => "/home/user/addFields.rb"
                script_params => {
                    "values" => "restOfLine"
                    "keys" => [ "icmpType" ]
                }
            }
            if [icmpType] == "0" {
                # Echo reply
                # I am not going to write all these!
            } else if [icmpType] == "3" {
                # Destination unreachable
                # Need to parse the codes etc
            }
            # ... and the other types
        } else if [protocol] == "6" {
            # TCP
            ruby {
                path => "/home/user/addFields.rb"
                script_params => {
                    "values" => "restOfLine"
                    "keys" => [ "srcPort", "dstPort", "dataLength", "tcpFlags", "tcpSeq", "tcpAck", "tcpWindow", "tcpUrg", "tcpOptions" ]
                }
            }
        } else if [protocol] == "17" {
            # UDP
            ruby {
                path => "/home/user/addFields.rb"
                script_params => {
                    "values" => "restOfLine"
                    "keys" => [ "srcPort", "dstPort", "dataLength" ]
                }
            }
        } else if [protocol] == "112" {
            # CARP
            ruby {
                path => "/home/user/addFields.rb"
                script_params => {
                    "values" => "restOfLine"
                    "keys" => [ "type", "carpTtl", "vhid", "carpVersion", "advskew", "advbase" ]
                }
            }
        }
    }
}

Will get you udp events like

{
      "anchor" => "",
   "interface" => "igb3",
         "tos" => "0x0",
        "rule" => "5",
    "protocol" => "17",
"protocolName" => "udp",
     "dstPort" => "5678",
     "tracker" => "1000000103",
          "id" => "0",
     "srcPort" => "5678",
       "srcIp" => "177.85.233.73",
       "dstIp" => "255.255.255.255",
         "ttl" => "64",
      "offset" => "0",
   "direction" => "in",
      "reason" => "match",
       "flags" => "none",
     "subrule" => "",
     "message" => "<134>Sep 29 09:03:57 filterlog: 5,,,1000000103,igb3,match,block,in,4,0x0,,64,0,0,none,17,udp,134,177.85.233.73,255.255.255.255,5678,5678,114",
      "length" => "134",
    "@version" => "1",
         "ecn" => "",
  "@timestamp" => 2020-09-29T13:03:57.000Z,
  "syslog_pri" => "134",
   "ipVersion" => "4",
      "action" => "block",
  "restOfLine" => [],
  "dataLength" => "114"
}

and for tcp events

{
      "anchor" => "",
   "interface" => "igb1",
      "tcpSeq" => "2184934504",
         "tos" => "0x0",
        "rule" => "5",
    "protocol" => "6",
"protocolName" => "tcp",
     "dstPort" => "443",
     "tracker" => "1000000103",
          "id" => "10381",
     "srcPort" => "51333",
       "srcIp" => "10.0.5.22",
       "dstIp" => "62.67.238.152",
      "tcpAck" => "",
         "ttl" => "128",
   "tcpWindow" => "8192",
  "tcpOptions" => "mss;nop;wscale;nop;nop;sackOK",
      "offset" => "0",
   "direction" => "in",
      "reason" => "match",
       "flags" => "DF",
     "subrule" => "",
     "message" => "<134>Sep 29 09:03:57 filterlog: 5,,,1000000103,igb1,match,block,in,4,0x0,,128,10381,0,DF,6,tcp,52,10.0.5.22,62.67.238.152,51333,443,0,S,2184934504,,8192,,mss;nop;wscale;nop;nop;sackOK",
      "length" => "52",
      "tcpUrg" => "",
    "@version" => "1",
         "ecn" => "",
  "@timestamp" => 2020-09-29T13:03:57.000Z,
  "syslog_pri" => "134",
   "ipVersion" => "4",
      "action" => "block",
  "restOfLine" => [],
  "dataLength" => "0",
    "tcpFlags" => "S"
}

I leave it as an exercise for the reader to add IPV6 handling, ICMP handling, and error handling.

3 Likes