HI guys,
I'm trying to create a logstash pipeline that parses incoming CEF logs, apply some logic and then outputs the log in JSON format to the console. For some reason, a field is generated with the name "Virtual System". The value of this field is the following key=value pair.
I don't understand why this field is created and why it has this value. I understand that the name is caused by the renaming of field "cs3". However, that field doesn't have a value and should therefore not be included in the output. (As stated by the allow_empty_values statement) I have tried to remove this entire field by placing it in a remove_field array but that doesn't work.
I need to resolve this since I want to perform further logic on the value of the field "act". (This is not possible since the entire key=value match is being parsed as a value to the "virtual system" field).
Another question: I there a simple way to output the new parsed log (that now contains fewer and different fields) as a CEF message instead of JSON. I have tried the cef codec but that didn't work. If I use the plain codec, I don't see any changes to the original message and there is no delimiter after each log.
Raw message:
"message" => "<14>Apr 27 00:38:45 paloaltovm CEF:0|PaloAltoNetworks|PAN-OS|11.0.1|Succeeded|CONFIG|1|rt=Apr 26 2023 22:38:44 GMT deviceExternalId=63CCFFBBE1C0D07 shost=82.217.2.18 cs3= act=set duser=PaloAlto destinationServiceName=Web msg= vsys vsys1 rulebase security rules blabla externalId=7226424316814950417 PanOSDGl1=0PanOSDGl2=0 PanOSDGl3=0 PanOSDGl4=0 PanOSVsysName= dvchost=paloaltovm PanOSActionFlags=0x0 cs1Label=\"Before Change Detail\" cs1={} cs2Label=\"After Change Detail\" cs2={blabla 55c6e493-a011-4556-ac9e-6f5318913d3d { to [ any ]; from [ any ]; source [ testadress ]; destination [ any ]; source-user [ any ]; category [ any ]; application [ any ]; service [ application-default ]; source-hip [ any ]; destination-hip [ any ]; tag [ tag1 ]; action allow; rule-type universal; description yoyo!; } } PanOSFWDeviceGroup=0 PanOSPolicyAuditComment=auditcommenting"
pipeline output:
"Virtual System" => "act=set",
"CEF_device_vendor" => "PaloAltoNetworks",
"After Change Details" => "{blabla",
"Severity" => "1",
"host" => {
"ip" => "172.20.0.4"
},
"Before Change Details" => "{}",
"CEF_version" => "0",
"msg" => "vsys",
"CEF_device_product" => "PAN-OS",
"@timestamp" => 2023-04-26T22:38:46.401601528Z,
"PanOSVsysName" => "dvchost=paloaltovm",
"PanOSPolicyAuditComment" => "auditcommenting",
"destinationServiceName" => "Web",
"Sublogtype" => "Succeeded",
"Firewall_ID" => "63CCFFBBE1C0D07",
"Logtype" => "CONFIG",
"CEF_device_version" => "11.0.1",
"source_host" => "82.217.2.18",
"@version" => "1",
"duser" => "PaloAlto"
Filter configuration in the pipeline:
filter {
#PART 1A: PARSING OF ALL LOGS
grok {
#Parsing the CEF message into the CEF fields and extensions
match => { 'message' => '(?<Syslogdata>.*(?=CEF))CEF:%{INT:CEF_version}\|%{WORD:CEF_device_vendor}\|(?<CEF_device_product>\w+-\w+)\|(?<CEF_device_version>\d+\.\d+\.\d+)\|(?<Sublogtype>\b\w+(?:-\w+)?\b)\|%{WORD:Logtype}\|%{INT:Severity}\|rt=%{MONTH:Month} %{MONTHDAY:Day} %{YEAR:Year} %{TIME:Time} %{WORD:Timezone} %{GREEDYDATA:extensions}'}
}
#Parses all extension fields into different field=Value matches
kv {
source => "extensions" #split all extensions into field=value pairs
field_split => " "
value_split => "="
trim_value => "\""
allow_empty_values => false
tag_on_failure => ["extensions_parse_error"]
#removal of fields that are not needed.
remove_field => [Syslogdata,Month,Year,Day,Timezone,Time,"Virtual System","event",extensions,"PanOSDGl4","PanOSDGl3","PanOSDGl1","PanOSDGl2","PanOSDGl4","PanOSActionFlags", "PanOSFWDeviceGroup","flexNumber1Label","flexNumber2Label","flexString1Label","flexString2Label", "cn1Label","cn2Label","cn3Label","cs1Label", "cs2Label", "cs3Label","cs4Label","cs5Label","cs6Label","externalId"]
}
#PART 1B: RENAMING EXTENSION FIELDS
if [Logtype] == "CONFIG" {
mutate { #Renamings for the config logs
rename => {
"cs3" => "Virtual System"
"cs1" => "Before Change Details(cs1)"
"cs2" => "After Change Details(cs2)"
"cat" => "EventID"
"act" => "action"
}
}
}
if [Logtype] == "SYSTEM" {
mutate { #Renamings for the system logs
rename => {
"flexString2" => "Module"
"cat" => "category"
}
}
}
if [Logtype] == "TRAFFIC" {
mutate { #Renamings for the traffic logs
rename => {
"cn1" => "SessionID"
"cn2" => "Packets"
"cn3" => "Elapsed time in seconds"
"cs2" => "URL_Category"
"cs4" => "Source_Zone"
"cs5" => "Destination_Zone"
"cs6" => "Log_Profile"
"flexNumber1" => "Total_Bytes"
"flexString1" => "Flags"
"flexString2" => "Module"
"cat" => "Action_Source"
}
}
}
if [Logtype] == "Authentification" {
mutate { #Renamings for the authentification logs
rename => {
"cn1" => "Factor Number"
"cn2" => "AuthenticationID"
"cs1" => "Server Profile"
"cs2" => "Normalize User"
"cs4" => "Authentication Policy"
"cs5" => "Client Type"
"cs6" => "Log Action"
"flexNumber1" => "Total Bytes"
"flexString2" => "Vendor"
}
}
}
mutate { #general renamings that apply to all logtypes
rename => {
"shost" => "source_host"
"deviceExternalId" => "Firewall_ID"
"fname"=>"file_name"
"PanOSTimeGeneratedHighResolution" => "Date_Time"
"cs3" => "Virtual System"
}
}
#PART 2: FILTERING OF EVENTS
# if statements to drop unnecessary events
if [Logtype] == "SYSTEM" or [logtype] == "AUTHENTICATION" {
if [Sublogtype] in ["global-protect", "general", "dnsproxy","auth"] {
if [Severity] == "1" {
if ([msg] =~ "(^user \w+ logged in.*)|(^failed authentification for user \w+.*)|(config.*)") {
#pass
} else {
drop {} #drops all prio 1 (informational) alerts of the general sublogtype that do not contain the listed mentioned message.
}
}else if ([Severity] == "3") and ([msg] =~ "(.*Reason: Authentication profile not found.*)"){
drop {} #don't want to recieve failed login attempts to users that do not exist.
}} else {
drop {} #drops all logs from the system and authentification logs, that are not part of listed sublogtypes
}
}else if [Logtype] == "TRAFFIC" {
if [Sublogtype] not in ["start", "end"] {
# if logs are not about allowed traffic, then drop
drop {}
}
# }else if [Logtype] == "CONFIG"{
# if [Sublogtype] not in []}
}else {
#pass
}