Normalizing the Huawei firewall logs

Hello , I am actually working on a ELK SIEM project, and one of the logs sources i am woking with is a Huawei Firewall .Since Huawei firewall logs have differents formats, I would appreciate some suggestions and insights on the best practices for parsing huawei firewall logs to ensure seamless integration and enchances visibility.

Also, I am using logstash pipelines to ingest the logs .

Hello and welcome,

You would need to share some sample messages and the different formats, without it is not possible to know what to suggest.

If you're using Elastic Security, you'll want to map your log sources to ECS. Here are two resources I suggest reviewing:

There isn't an integration for Huawei firewall integration, but Elastic does have integrations for several other firewall vendors. Each integration package includes sample events as a starting point for how you'd want to map fields for Huawei:

Hello, thank you for your response, i will share some sample messages in the syslog format . Btw I changed the logs but only the values and nothing related to the format

<190>Jan 19 2023 08:53:29 USG6630E-01-DC %%01POLICY/6/POLICYPERMIT(l):vsys=public, protocol=9, source-ip=255.255.255.255, source-port=51290, destination-ip=255.255.255.255, destination-port=999999, time=Jan 19 2023 09:53:29, source-zone=zone1, destination-zone=zone2, application-name=HTTPS, rule-name=zone1-zone2.\u0000
<190>2023-01-19 08:53:29 USG6630E-01-DC %%01SECLOG/6/SESSION_TEARDOWN(l):IPVer=4,Protocol=udp,SourceIP=255.255.255.255,DestinationIP=255.255.255.255,SourcePort=52912,DestinationPort=52912,BeginTime=1674118371,EndTime=1674118371,SendPkts=1,SendBytes=75,RcvPkts=1,RcvBytes=132,SourceVpnID=0,DestinationVpnID=0,SourceZone=zone1,DestinationZone=zone2,PolicyName=zone1-zone2,CloseReason=aged-out,ApplicationName=HTTP.
<189>Jun  5 2023 10:34:40 USG6630E-01-DC %%01SHELL/5/CMDRECORDFAILED(s)[71473]:Recorded command information. (Task=IMDT, Ip=**, VpnName=, User=_system_, AuthenticationMethod=\"Null\", Command=\"pki import-certificate ca filename file.pem\", Result=ExecutionFailure)\n
<188>Jun  6 2023 13:53:05 USG6630E-01-DC IPSTRAP/4/THREATTRAP:OID 1.3.6.1.4.1.2011.6.122.43.1.2.8 An intrusion was detected. (SrcIp=255.255.255.255, DstIp=255.255.255.255, SrcPort=63318, DstPort=63318, Protocol=UDP, Event=CnC Domain: Trojan: Floxif: 5isohu.com, DetectTime=2023/06/06 14:53:06)\n"
<190>Jun  5 2023 12:20:41 USG6630E-01-DC %%01POLICY/6/POLICYDENY(l):vsys=public, protocol=999, source-ip=255.255.255.255, source-port=49298, destination-ip=255.255.255.255, destination-port=49298, time=Jun  5 2023 13:20:41, source-zone=zone1, destination-zone=zone2, application-name=, rule-name=default.\u0000\n
<188>Jun  6 2023 13:53:05 USG6630E-01-DC IPSTRAP/4/THREATTRAP:OID 1.3.6.1.4.1.2011.6.122.43.1.2.8 An intrusion was detected. (SrcIp=255.255.255.255, DstIp=255.255.255.255, SrcPort=63318, DstPort=63318, Protocol=UDP, Event=CnC Domain: Floxif, DetectTime=2023/06/06 14:53:06)\n
<188>Jun  6 2023 13:30:43 USG6630E-01-DC DS/4/DATASYNC_CFGCHANGE:OID 1.3.6.1.4.1.2011.5.25.191.3.1 configurations have been changed. The current change number is 167, the change loop count is 0, and the maximum number of records is 9999.\n"
<185>Jun 13 2023 14:06:22 USG6630E-01-DC HRPI/1/COCHK:1.3.6.1.4.1.2011.6.122.51.2.2.4 The configurations between active and standby device is different(The key pairs of the active and standby devices are inconsistent(VsysID = 0). To solve this problem, run the pki rsa local-key-pair backup all-sys and pki certificate backup all-sys commands.).\n

The problem is that there is a lot of fields that shares the same type of information like "SrcIp" and "source-ip" or "Application" and "application-name" and the timestamp can be in differents formats.
I will share also the filter plugin that i initially started with to normalize the logs. Its not perfect and doesn't work with the last log for exemple but maybe you can help me to upgrade it.

filter{
  grok {
    #Pattern to extract the log identifier and description and content
		pattern_definitions => {"log_identifier" => "[^=]+" }
		pattern_definitions	=> {"log_description" => "([^=]+\.|)" }
		#Pattern to match the timestamp at the start of the log
		pattern_definitions => {"FWtimestamp" => "%{MONTH}\s*%{MONTHDAY}\s*%{YEAR}\s*%{TIME}"}
    #Separate the log content from the header
		match => {
			"message" => [
				"%{SYSLOG5424PRI}%{FWtimestamp:[log][timestamp]} %{HOSTNAME:[log][hostname]} %{log_identifier:[log][identifier]}:%{log_description:[log][description]}%{GREEDYDATA:[log][content]}",
				"%{SYSLOG5424PRI}%{TIMESTAMP_ISO8601:[log][timestamp]} %{HOSTNAME:[log][hostname]} %{log_identifier:[log][identifier]}:%{log_description:[log][description]}%{GREEDYDATA:[log][content]}"
			]
		}
    #remove_field => [ "message" ]
		add_field => {"[log][source]" => "Firewall Huawei"}
  }
	#Get the infos from the log identifier 
	grok {
			match => { "[log][identifier]" => "(%%|)(?<[log][facility]>\d+)?(?<[log][category]>[A-Z]+)\/(?<[log][severity]>\d+)\/(?<[log][action]>[A-Z_]+)(\((?<[log][type]>[a-z])\)(?:\[%{POSINT:[log][id]}\])?)?"}
			#remove_field => [ "[log][identifier]" ]
	}
	#Remove useless characters
	mutate {
		gsub => [
			"[log][content]", "\u0000","",
			"[log][content]", "\(","",
			"[log][content]", "\)","",
			"[log][content]", "\n",""
			
		]
	}
  #Extract the pairs of "Key=Value" separated by ", "
  kv {
		source => "[log][content]"
		field_split_pattern => ",\s+"
		value_split => "="
		trim_key => " \""
		trim_value => " \""
	}
 
	
	#Mutate the time of the event in the log data as the timestamp 
	if [time] {
		mutate {
			remove_field => "[log][timestamp]"
		}
		date {
			match => ["time","MMM  dd YYYY HH:mm:ss","MMM dd YYYY HH:mm:ss"]
			timezone => "Etc/GMT-1"
			target => "@timestamp"
		}
  }
	#If there is not a time field
	else {
		date {
			match => ["[log][timestamp]","yyyy-MM-dd HH:mm:ss","MMM  dd yyyy HH:mm:ss","MMM dd yyyy HH:mm:ss"]
				timezone => "Etc/GMT-1"
				target => "@timestamp"
		}
		mutate {
      remove_field => "[log][timestamp]"
    }	
	}
	ruby {
	# This code is used to normalize the similar fields like SrcIP and source-ip etc
    	code => '
      event.to_hash.each {|k, v|
        new_key = k.gsub(/Dst|destination-/,"destination.").gsub(/Src|source-/,"source.")
        event.set(new_key, v)
        event.remove(k) if new_key != k

      }
	#This part is used to split a Field into multiple fields (Exemple UserName will become User.Name)
      event.to_hash.each { |k, v|
				if k.match(/^[A-Z][A-Za-z]*[A-Z](?:[A-Za-z0-9_]*)*$/) && (v.is_a?(String) || v.is_a?(Numeric))                                                                                                                                                                                                                                                                                                            
					new_field = k.gsub(/(?<=\p{Lower})(?=\p{Upper})|_/, ".")
					event.set(new_field, v)
					event.remove(k)
				elsif v.is_a?(Hash)
					v.each { |sub_k, sub_v|
						if sub_k.is_a?(String) || sub_k.is_a?(Numeric)
							new_sub_field = "#{k}.#{sub_k}"
							event.set(new_sub_field, sub_v)
						end
					}
					event.remove(k)
				#elsif v.is_a?(Hash) || v.is_a?(LogStash:Event)
				#	v.to_hash.each { |sub_k, sub_v|
				#		if sub_k.match(/^[A-Z][A-Za-z]*[A-Z](?:[A-Za-z0-9_]*)*$/) && sub_v.is_a?(String) 
				#			new_sub_field = k.gsub(/(?<=\p{Lower})(?=\p{Upper})/, ".")
				#			event.set("[#{k}][#{new_sub_field}]",sub_v)
				#			event.remove("[#{k}][#{sub_k}]")
				#		end
				#	}
				end				
			}
    '
  }
}

Thank you i will take a look

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.