How can I generate a CEF output

I have created a logstash configuration that successfully parses CEF logs and applies certain logic to it. The filter configuration extracts the CEF with a grok filter and then uses the kv plugin to extract the different extension fields which are then renamed and removed if needed. All the extension fields are placed in an separate array "new_extensions"

Now I would like to generate a modified output in CEF format. Simply saying this would be the value of the field "CEF_header" + the value of all the keys in the "new_extensions" array. I have tried everything but I cannot make it work. Can anyone give some advice?

I have tried using the cef codec in the output but that doesn't completely work since it places the complete key=values matches as extensions
Output generated when using the CEF output plugin:

CEF:0|PaloAltoNetworks|PAN-OS|11.0.1|auth|SYSTEM|6|extensionkeys=["Firewall_ID","63CCFFBBE1C0D07","Module","general","Hostname","paloaltovm","category","auth-fail","msg","failed authentication for user 'admin'.  Reason: Authentication profile not found for the user. From: 73.112.30.5.","CEST_Date_Time","2023-05-01T14:00:57.579+02:00\\n"]/r/n

I can't use the CEF codec in the input since the fields that are used change for each logtype.

current Logstash config:

input {
  udp {
    port => 514
  }
}
#
filter {
#PART 1A: PARSING OF ALL LOGS
  grok {
	#Parsing the CEF message into the CEF fields and extensions
    match => { 'message' => '(?<Syslogdata>.*(?=CEF))CEF:%{INT:CEF_version}\|%{WORD:CEF_device_vendor}\|(?<CEF_device_product>\w+-\w+)\|(?<CEF_device_version>\d+\.\d+\.\d+)\|(?<Sublogtype>\b\w+(?:-\w+)?\b)\|%{WORD:Logtype}\|%{INT:Severity}\|rt=%{MONTH:Month} %{MONTHDAY:Day} %{YEAR:Year} %{TIME:Time} %{WORD:Timezone} %{GREEDYDATA:extensions}'}
  }
	#Parses all extension fields into different field=Value matches
  kv {
    source => "extensions" #split all extensions into field=value pairs
    target => "new_extensions"
    recursive => false
    field_split => ";"
    value_split => "="
    tag_on_failure => ["extensions_parse_error"]
    allow_empty_values => false
    add_field => { #stores the entire CEF header separately 
        **"CEF_Header**" => "CEF:%{CEF_version}|%{CEF_device_vendor}|%{CEF_device_product}|%{CEF_device_version}|%{Sublogtype}|%{Logtype}|%{Severity}|rt=%{Year} %{Month} %{Day} %{Time} %{Timezone}"
        }
    #removal of fields that are not needed and the rest in a new field
    remove_field => ["[new_extensions][Syslogdata]","[new_extensions][event]","[new_extensions][externalId]","[new_extensions][PanOSActionFlags]","[new_extensions][PanOSDGl1]","[new_extensions][PanOSDGl2]","[new_extensions][PanOSDGl3]","[new_extensions][PanOSDGl4]","[new_extensions][PanOSFWDeviceGroup]","[new_extensions][PanOSVsysName]","[new_extensions][Virtual System]","[extensions]","[Syslogdata]","[Month]","[Time]","[Timezone]","[Year]","[Day]"]
    remove_field => ["[new_extensions][cs1Label]","[new_extensions][cs2Label]","[new_extensions][cs3Label]","[new_extensions][cs4Label]","[new_extensions][cs5Label]","[new_extensions][cs6Label]","[new_extensions][flexString1Label]","[new_extensions][flexString2Label]","[new_extensions][cn1Label]","[new_extensions][cn2Label]","[new_extensions][cn3Label]","[new_extensions][flexNumber1Label]","[new_extensions][flexNumber2Label]"]
    remove_field => ["[event]","[event][original]","[event][ip]"]
}


#PART 1B: RENAMING EXTENSION FIELDS
        if [Logtype] == "CONFIG" {
            mutate { #Renamings for the config logs
                rename => {
                 "[new_extensions][cs1]" => "[new_extensions][Before Change Details]"
                 "[new_extensions][cs2]" => "[new_extensions][After Change Details]"
                 "[new_extensions][cat]" => "[new_extensions][EventID]"
                 "[new_extensions][duser]" => "[new_extensions][ActorUsername]"
                 "[new_extensions][shost]" => "[new_extensions][sourceHostName]"
                 }
                 }
        }
        if [Logtype] == "SYSTEM" {
            mutate { #Renamings for the system logs
                rename => {
                  "[new_extensions][flexString2]" => "[new_extensions][Module]"
                  "[new_extensions][cat]" => "[new_extensions][category]"
                }
                }
        }
        if [Logtype] in ["TRAFFIC", "URL", "WILDFIRE"] {
            mutate { #Renamings for the traffic logs
                 rename => {
                   "[new_extensions][cn1]" => "[new_extensions][SessionID]"
                   "[new_extensions][cn2]" => "[new_extensions][Packets]"
                   "[new_extensions][cn3]" => "[new_extensions][Elapsed_time_in_seconds]"
                   "[new_extensions][cs1]" => "[new_extensions][Rule]"
                   "[new_extensions][cs2]" => "[new_extensions][URL_Category]"
                   "[new_extensions][cs4]" => "[new_extensions][Source_Zone]"
                   "[new_extensions][cs5]" => "[new_extensions][Destination_Zone]"
                   "[new_extensions][cs6]" => "[new_extensions][Log_Profile]"
                   "[new_extensions][flexNumber1]" => "[new_extensions][Total_Bytes]"
                   "[new_extensions][flexString2]" => "[new_extensions][Direction]"
                   "[new_extensions][cat]" => "[new_extensions][Action_Source]"
                   "[new_extensions][src]" => "[new_extensions][Source_IP]"
                   "[new_extensions][dst]" => "[new_extensions][Destination_IP]"
                   "[new_extensions][app]" => "[new_extensions][Application_Protocol]"
                   "[new_extensions][start]" => "[new_extensions][Start_time]"
                   "[new_extensions][end]" => "[new_extensions][End_time]"
                   "[new_extensions][in]" => "[new_extensions][BytesIn]"
                   "[new_extensions][out]" => "[new_extensions][BytesOut]"
                   "[new_extensions][proto]" => "[new_extensions][UsedProtocol]"
                 }
                 }
         }
        if [Logtype] == "AUTHENTIFICATION" {
            mutate { #Renamings for the authentification logs
                 rename => {
                   "[new_extensions][cn1]" => "[new_extensions][Factor_Number]"
                   "[new_extensions][cn2]" => "[new_extensions][AuthenticationID]"
                   "[new_extensions][cs1]" => "[new_extensions][Server_Profile]"
                   "[new_extensions][cs2]" => "[new_extensions][Normalize_User]"
                   "[new_extensions][cs4]" => "[new_extensions][Authentication_Policy]"
                   "[new_extensions][cs5]" => "[new_extensions][Client_Type]"
                   "[new_extensions][cs6]" => "[new_extensions][Log_Action]"
                   "[new_extensions][flexNumber1]" => "[new_extensions][Total_Bytes]"
                   "[new_extensions][flexString2]" => "[new_extensions][Vendor]"
                 }
                 }
        }
        if [Logtype] in ["THREAT", "DATA"]{
                mutate { #Renamings for the authentification logs
                         rename => {
                           "[new_extensions][cn1]" => "[new_extensions][SessionID]"
                           "[new_extensions][cn2]" => "[new_extensions][AuthenticationID]"
                           "[new_extensions][cs1]" => "[new_extensions][Rule]"
                           "[new_extensions][cs2]" => "[new_extensions][URL_Category]"
                           "[new_extensions][cs4]" => "[new_extensions][Source_Zone]"
                           "[new_extensions][cs5]" => "[new_extensions][Client_Type]"
                           "[new_extensions][cs6]" => "[new_extensions][Log_Profile]"
                           "[new_extensions][flexString2]" => "[new_extensions][Direction]"
                         }
                         }
        }
         mutate { #general renamings that apply to all logtypes
                 rename => {
                  "[new_extensions][deviceExternalId]" => "[new_extensions][Firewall_ID]"
                  "[new_extensions][fname]"=>"[new_extensions][File_Name]"
                  "[new_extensions][PanOSTimeGeneratedHighResolution]" => "[new_extensions][CEST_Date_Time]"
                  "[new_extensions][cs3]" => "[new_extensions][Virtual System]"
                  "[new_extensions][flexString1]" => "[new_extensions][Flags]"
                  "[new_extensions][act]" => "[new_extensions][DeviceAction]"
                  "[new_extensions][PanOSPolicyAuditComment]" => "[new_extensions][AuditComment]"
                  "[new_extensions][cat]" => "[new_extensions][DeviceEventCategory]"
                  "[new_extensions][dpt]" => "[new_extensions][Destination_Port]"
                  "[new_extensions][duid]" => "[new_extensions][Destination_UserID]"
                  "[new_extensions][duser]" => "[new_extensions][Destination_User]"
                  "[new_extensions][dvchost]" => "[new_extensions][Hostname]"
                  "[new_extensions][suser]" => "[new_extensions][Source_Username]"
                  }
                  }

#PART 2: FILTERING OF EVENTS
  # if statements to drop unnecessary events
if [Logtype] == "SYSTEM" or [Logtype] == "AUTHENTICATION" {
	if [Sublogtype] in ["global-protect", "general", "dnsproxy","auth-fail","auth","dns-proxy"] {
		if [Severity] == "1" {
			if [new_extensions][msg] =~ "(^User \w+ logged in.*)|(^failed authentification for user \w+.*)|(Commit job succeeded.*)" {
				#pass
			} else {drop {}} #pass
		} else {} #pass
	}else {drop{}} #drop all system and auth logs that are not of the mentioned sublogtypes

} else if [Logtype] == "TRAFFIC" {
	if [new_extensions][DeviceAction] != "allow" {
		drop{}
	} else if [Sublogtype] not in ["start", "end"] {
		drop{}
	} else if [new_extensions][Source_IP] =~ "^(10\.|172\.(1[6-9]|2[0-9]|3[01])\.|192\.168\.)(\d{1,3}\.){1,3}\d{1,3}$" { # check if the source is internal.
		if [new_extensions][Destination_IP] =~ "^(10\.|172\.(1[6-9]|2[0-9]|3[01])\.|192\.168\.)(\d{1,3}\.){1,3}\d{1,3}$" { # check if the destination is also internal.
			drop{}
		}else {} #pass
	}else {
	#pass
	}

} else if [Logtype] == "CONFIG" {
    if [Sublogtype] not in ["Succeeded","commit"] {
        drop {} # drops all logs from the config logtype, that are not part of listed sublogtypes.
    }else {
       if [new_extensions][DeviceAction] not in ["add", "delete", "edit", "set"] {
          drop {} # if the performed action is not addition, removal, or modification, then drop.
       }else{} #pass
	}
}else{
drop{} #drop all other logtypes
}
}

#PART 3: PUTTING EVERYTHING TOGETHER FOR THE OUTPUT
#PART 4: SENDING IT TO SENTINEL
output {
  stdout {}
  }

Current output:

        "new_extensions" => {
           "Firewall_ID" => "63CCFFBBE1C0D07",
                "Module" => "general",
              "Hostname" => "paloaltovm",
              "category" => "general",
                   "msg" => "User PaloAlto logged in via Web from 82.217.2.18 using https",
        "CEST_Date_Time" => "2023-05-01T14:44:12.970+02:00\n"
    },
               "message" => "<14>May  1 14:44:12 paloaltovm CEF:0|PaloAltoNetworks|PAN-OS|11.0.1|general|SYSTEM|1|rt=May 01 2023 12:44:12 GMT deviceExternalId=63CCFFBBE1C0D07;cs3=;fname=;flexString2Label=\"Module\";flexString2=general;msg=\"User PaloAlto logged in via Web from 82.217.2.18 using https\";externalId=7228109898205102356;cat=general;PanOSDGl1=0PanOSDGl2=0;PanOSDGl3=0;PanOSDGl4=0PanOSVsysName=;dvchost=paloaltovm;PanOSActionFlags=0x0;PanOSTimeGeneratedHighResolution=2023-05-01T14:44:12.970+02:00\n",                                                                                                                                                                                                  
           "CEF_version" => "0",
            "CEF_Header" => "CEF:0|PaloAltoNetworks|PAN-OS|11.0.1|general|SYSTEM|1|rt=2023 May 01 12:44:12 GMT",
                  "host" => {
        "ip" => "172.20.0.4"
    },
    "CEF_device_version" => "11.0.1",
              "@version" => "1",
               "Logtype" => "SYSTEM",
              "Severity" => "1",
    "CEF_device_product" => "PAN-OS",
     "CEF_device_vendor" => "PaloAltoNetworks",
            "@timestamp" => 2023-05-01T12:44:14.836054340Z,
            "Sublogtype" => "general"

Use a line codec. Build the message differently for each [Logtype].

    if [Logtype] == "SYSTEM" {
        mutate { add_field => { "payload" => "ab:%{[new_extensions][flexString2]} cd:%{[new_extensions][cat]}" } }
    }

and output

  stdout { codec => line { format => "%{CEF_header}|%{payload}" }

in which case you do not actually need all of the mutate+rename logic.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.