PFSense logs have a fixed header followed by a variety of CSV entries. (Format documented here.) I would take off the fixed header using dissect, not grok (the tooling in logstash has improved a lot in the last 6 years)
dissect { mapping => { "message" => "<%{syslog_pri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} filterlog: %{restOfLine}" } }
date { match => [ "[@metadata][ts]", "MMM dd HH:mm:ss" ] }
If your dates do not have a year in them then logstash will guess, and sometimes it will guess wrong. See issues 137, 100, and the long discussion of 51.
If you literally just want to extract the IPs then you could do it using grok
grok { match => { "restOfLine" => "%{IP:srcIP},%{IP:dstIP}" } }
If you want to do the job properly, then you need to pull out a set of fields, label them, and determine what the next set of fields are. My initial approach to this was to use dissect again and I got to this point, and realized I had far to go
# dissect will not parse ",," using ",%{fieldName},", so replace it with something we can later delete
mutate { gsub => [ "restOfLine", ",,", ",!!!," ] }
dissect { mapping => { "restOfLine" => "%{rule},%{subRule},%{anchor},%{tracker},%{interface},%{reason},%{action},%{direction},%{ipVersion},%{restOfLine}" } }
if "_dissectfailure" not in [tags] {
if [ipVersion] == "4" {
dissect { mapping => { "restOfLine" => "%{tos},%{ecn},%{ttl},%{id},%{offset},%{flags},%{protocol},%{protocolName},%{restOfLine}" } }
} else if [ipVersion] == "6" {
# Never seen one of thse, "protocol" be the name, in which case protocol and protocolId should change to be like V4
dissect { mapping => { "restOfLine" => "%{class},%{flowLabel},%{hopLimit},%{protocol},%{protocolId},%{restOfLine}" } }
} else {
mutate { add_field => { "parseFailure" => "unknown IP version" } }
}
if [ipVersion] in [ "4", "6" ] {
dissect { mapping => { "restOfLine" => "%{length},%{srcIp},%{dstIp},%{restOfLine}" } }
if [protocol] in [ "6", "17" ] {
# TCP and UDP
dissect { mapping => { "restOfLine" => "%{srcPort},%{dstPort},%{dataLength},%{restOfLine}" } }
} else if [protocol] == "112" {
# CARP
dissect { mapping => { "restOfLine" => "%{type},%{carpTtl},%{vhid},%{carpVersion},%{advskew},%{advbase},%{restOfLine}" } }
}
if [protocol] == "6" {
dissect { mapping => { "restOfLine" => "%{tcpFlags},%{tcpSeq},%{tcpAck},%{tcpWindow},%{tcpUrg},%{tcpOptions}" } }
} else if [protocol] == "1" {
# ICMP
dissect { mapping => { "restOfLine" => "%{icmpType}},%{restOfLine}" } }
# Lots more code to dissect each of these ...
}
}
}
Note that that code is incomplete and fails to parse UDP correctly because 'if [protocol] in [ "6", "17" ] {' needs to be two separate branches...
I came up with a different solution... Create a ruby script file that contains
def register(params)
@keys = params['keys']
@values = params['values']
end
def filter(event)
values = event.get(@values)
@keys.each { |k|
event.set(k, values.shift)
}
event.set(@values, values)
[event]
end
You call that using a ruby filter like
ruby {
path => "/home/user/addFields.rb"
script_params => {
"values" => "restOfLine"
"keys" => [ "tos", "ecn", "ttl", "id", "offset", "flags", "protocol", "protocolName" ]
}
}
That assumes that restOfLine is an array that contains values. It will remove entries from the array for each of the keys and add them to the event.
Using that like this
dissect { mapping => { "message" => "<%{syslog_pri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} filterlog: %{restOfLine}" } }
date { match => [ "[@metadata][ts]", "MMM dd HH:mm:ss" ] }
mutate { split => { "restOfLine" => "," } }
ruby {
path => "/home/user/addFields.rb"
script_params => {
"values" => "restOfLine"
"keys" => [ "rule", "subrule", "anchor", "tracker", "interface", "reason", "action", "direction", "ipVersion" ]
}
}
if [ipVersion] == "4" {
ruby {
path => "/home/user/addFields.rb"
script_params => {
"values" => "restOfLine"
"keys" => [ "tos", "ecn", "ttl", "id", "offset", "flags", "protocol", "protocolName" ]
}
}
} else if [ipVersion] == "6" {
# Never seen one of these, "protocol" could be the name, in which case protocol and protocolId should change to be like V4
ruby {
path => "/home/user/addFields.rb"
script_params => {
"values" => "restOfLine"
"keys" => [ "class", "flowLabel", "hopLimit", "protocol", "protocolId" ]
}
}
} else {
mutate { add_field => { "parseFailure" => "unknown IP version" } }
}
if [ipVersion] in [ "4", "6" ] {
ruby {
path => "/home/user/addFields.rb"
script_params => {
"values" => "restOfLine"
"keys" => [ "length", "srcIp", "dstIp" ]
}
}
if [protocol] == "1" {
# ICMP
ruby {
path => "/home/user/addFields.rb"
script_params => {
"values" => "restOfLine"
"keys" => [ "icmpType" ]
}
}
if [icmpType] == "0" {
# Echo reply
# I am not going to write all these!
} else if [icmpType] == "3" {
# Destination unreachable
# Need to parse the codes etc
}
# ... and the other types
} else if [protocol] == "6" {
# TCP
ruby {
path => "/home/user/addFields.rb"
script_params => {
"values" => "restOfLine"
"keys" => [ "srcPort", "dstPort", "dataLength", "tcpFlags", "tcpSeq", "tcpAck", "tcpWindow", "tcpUrg", "tcpOptions" ]
}
}
} else if [protocol] == "17" {
# UDP
ruby {
path => "/home/user/addFields.rb"
script_params => {
"values" => "restOfLine"
"keys" => [ "srcPort", "dstPort", "dataLength" ]
}
}
} else if [protocol] == "112" {
# CARP
ruby {
path => "/home/user/addFields.rb"
script_params => {
"values" => "restOfLine"
"keys" => [ "type", "carpTtl", "vhid", "carpVersion", "advskew", "advbase" ]
}
}
}
}
}
Will get you udp events like
{
"anchor" => "",
"interface" => "igb3",
"tos" => "0x0",
"rule" => "5",
"protocol" => "17",
"protocolName" => "udp",
"dstPort" => "5678",
"tracker" => "1000000103",
"id" => "0",
"srcPort" => "5678",
"srcIp" => "177.85.233.73",
"dstIp" => "255.255.255.255",
"ttl" => "64",
"offset" => "0",
"direction" => "in",
"reason" => "match",
"flags" => "none",
"subrule" => "",
"message" => "<134>Sep 29 09:03:57 filterlog: 5,,,1000000103,igb3,match,block,in,4,0x0,,64,0,0,none,17,udp,134,177.85.233.73,255.255.255.255,5678,5678,114",
"length" => "134",
"@version" => "1",
"ecn" => "",
"@timestamp" => 2020-09-29T13:03:57.000Z,
"syslog_pri" => "134",
"ipVersion" => "4",
"action" => "block",
"restOfLine" => [],
"dataLength" => "114"
}
and for tcp events
{
"anchor" => "",
"interface" => "igb1",
"tcpSeq" => "2184934504",
"tos" => "0x0",
"rule" => "5",
"protocol" => "6",
"protocolName" => "tcp",
"dstPort" => "443",
"tracker" => "1000000103",
"id" => "10381",
"srcPort" => "51333",
"srcIp" => "10.0.5.22",
"dstIp" => "62.67.238.152",
"tcpAck" => "",
"ttl" => "128",
"tcpWindow" => "8192",
"tcpOptions" => "mss;nop;wscale;nop;nop;sackOK",
"offset" => "0",
"direction" => "in",
"reason" => "match",
"flags" => "DF",
"subrule" => "",
"message" => "<134>Sep 29 09:03:57 filterlog: 5,,,1000000103,igb1,match,block,in,4,0x0,,128,10381,0,DF,6,tcp,52,10.0.5.22,62.67.238.152,51333,443,0,S,2184934504,,8192,,mss;nop;wscale;nop;nop;sackOK",
"length" => "52",
"tcpUrg" => "",
"@version" => "1",
"ecn" => "",
"@timestamp" => 2020-09-29T13:03:57.000Z,
"syslog_pri" => "134",
"ipVersion" => "4",
"action" => "block",
"restOfLine" => [],
"dataLength" => "0",
"tcpFlags" => "S"
}
I leave it as an exercise for the reader to add IPV6 handling, ICMP handling, and error handling.