Hello, thank you for your response, i will share some sample messages in the syslog format . Btw I changed the logs but only the values and nothing related to the format
<190>Jan 19 2023 08:53:29 USG6630E-01-DC %%01POLICY/6/POLICYPERMIT(l):vsys=public, protocol=9, source-ip=255.255.255.255, source-port=51290, destination-ip=255.255.255.255, destination-port=999999, time=Jan 19 2023 09:53:29, source-zone=zone1, destination-zone=zone2, application-name=HTTPS, rule-name=zone1-zone2.\u0000
<190>2023-01-19 08:53:29 USG6630E-01-DC %%01SECLOG/6/SESSION_TEARDOWN(l):IPVer=4,Protocol=udp,SourceIP=255.255.255.255,DestinationIP=255.255.255.255,SourcePort=52912,DestinationPort=52912,BeginTime=1674118371,EndTime=1674118371,SendPkts=1,SendBytes=75,RcvPkts=1,RcvBytes=132,SourceVpnID=0,DestinationVpnID=0,SourceZone=zone1,DestinationZone=zone2,PolicyName=zone1-zone2,CloseReason=aged-out,ApplicationName=HTTP.
<189>Jun 5 2023 10:34:40 USG6630E-01-DC %%01SHELL/5/CMDRECORDFAILED(s)[71473]:Recorded command information. (Task=IMDT, Ip=**, VpnName=, User=_system_, AuthenticationMethod=\"Null\", Command=\"pki import-certificate ca filename file.pem\", Result=ExecutionFailure)\n
<188>Jun 6 2023 13:53:05 USG6630E-01-DC IPSTRAP/4/THREATTRAP:OID 1.3.6.1.4.1.2011.6.122.43.1.2.8 An intrusion was detected. (SrcIp=255.255.255.255, DstIp=255.255.255.255, SrcPort=63318, DstPort=63318, Protocol=UDP, Event=CnC Domain: Trojan: Floxif: 5isohu.com, DetectTime=2023/06/06 14:53:06)\n"
<190>Jun 5 2023 12:20:41 USG6630E-01-DC %%01POLICY/6/POLICYDENY(l):vsys=public, protocol=999, source-ip=255.255.255.255, source-port=49298, destination-ip=255.255.255.255, destination-port=49298, time=Jun 5 2023 13:20:41, source-zone=zone1, destination-zone=zone2, application-name=, rule-name=default.\u0000\n
<188>Jun 6 2023 13:53:05 USG6630E-01-DC IPSTRAP/4/THREATTRAP:OID 1.3.6.1.4.1.2011.6.122.43.1.2.8 An intrusion was detected. (SrcIp=255.255.255.255, DstIp=255.255.255.255, SrcPort=63318, DstPort=63318, Protocol=UDP, Event=CnC Domain: Floxif, DetectTime=2023/06/06 14:53:06)\n
<188>Jun 6 2023 13:30:43 USG6630E-01-DC DS/4/DATASYNC_CFGCHANGE:OID 1.3.6.1.4.1.2011.5.25.191.3.1 configurations have been changed. The current change number is 167, the change loop count is 0, and the maximum number of records is 9999.\n"
<185>Jun 13 2023 14:06:22 USG6630E-01-DC HRPI/1/COCHK:1.3.6.1.4.1.2011.6.122.51.2.2.4 The configurations between active and standby device is different(The key pairs of the active and standby devices are inconsistent(VsysID = 0). To solve this problem, run the pki rsa local-key-pair backup all-sys and pki certificate backup all-sys commands.).\n
The problem is that there is a lot of fields that shares the same type of information like "SrcIp" and "source-ip" or "Application" and "application-name" and the timestamp can be in differents formats.
I will share also the filter plugin that i initially started with to normalize the logs. Its not perfect and doesn't work with the last log for exemple but maybe you can help me to upgrade it.
filter{
grok {
#Pattern to extract the log identifier and description and content
pattern_definitions => {"log_identifier" => "[^=]+" }
pattern_definitions => {"log_description" => "([^=]+\.|)" }
#Pattern to match the timestamp at the start of the log
pattern_definitions => {"FWtimestamp" => "%{MONTH}\s*%{MONTHDAY}\s*%{YEAR}\s*%{TIME}"}
#Separate the log content from the header
match => {
"message" => [
"%{SYSLOG5424PRI}%{FWtimestamp:[log][timestamp]} %{HOSTNAME:[log][hostname]} %{log_identifier:[log][identifier]}:%{log_description:[log][description]}%{GREEDYDATA:[log][content]}",
"%{SYSLOG5424PRI}%{TIMESTAMP_ISO8601:[log][timestamp]} %{HOSTNAME:[log][hostname]} %{log_identifier:[log][identifier]}:%{log_description:[log][description]}%{GREEDYDATA:[log][content]}"
]
}
#remove_field => [ "message" ]
add_field => {"[log][source]" => "Firewall Huawei"}
}
#Get the infos from the log identifier
grok {
match => { "[log][identifier]" => "(%%|)(?<[log][facility]>\d+)?(?<[log][category]>[A-Z]+)\/(?<[log][severity]>\d+)\/(?<[log][action]>[A-Z_]+)(\((?<[log][type]>[a-z])\)(?:\[%{POSINT:[log][id]}\])?)?"}
#remove_field => [ "[log][identifier]" ]
}
#Remove useless characters
mutate {
gsub => [
"[log][content]", "\u0000","",
"[log][content]", "\(","",
"[log][content]", "\)","",
"[log][content]", "\n",""
]
}
#Extract the pairs of "Key=Value" separated by ", "
kv {
source => "[log][content]"
field_split_pattern => ",\s+"
value_split => "="
trim_key => " \""
trim_value => " \""
}
#Mutate the time of the event in the log data as the timestamp
if [time] {
mutate {
remove_field => "[log][timestamp]"
}
date {
match => ["time","MMM dd YYYY HH:mm:ss","MMM dd YYYY HH:mm:ss"]
timezone => "Etc/GMT-1"
target => "@timestamp"
}
}
#If there is not a time field
else {
date {
match => ["[log][timestamp]","yyyy-MM-dd HH:mm:ss","MMM dd yyyy HH:mm:ss","MMM dd yyyy HH:mm:ss"]
timezone => "Etc/GMT-1"
target => "@timestamp"
}
mutate {
remove_field => "[log][timestamp]"
}
}
ruby {
# This code is used to normalize the similar fields like SrcIP and source-ip etc
code => '
event.to_hash.each {|k, v|
new_key = k.gsub(/Dst|destination-/,"destination.").gsub(/Src|source-/,"source.")
event.set(new_key, v)
event.remove(k) if new_key != k
}
#This part is used to split a Field into multiple fields (Exemple UserName will become User.Name)
event.to_hash.each { |k, v|
if k.match(/^[A-Z][A-Za-z]*[A-Z](?:[A-Za-z0-9_]*)*$/) && (v.is_a?(String) || v.is_a?(Numeric))
new_field = k.gsub(/(?<=\p{Lower})(?=\p{Upper})|_/, ".")
event.set(new_field, v)
event.remove(k)
elsif v.is_a?(Hash)
v.each { |sub_k, sub_v|
if sub_k.is_a?(String) || sub_k.is_a?(Numeric)
new_sub_field = "#{k}.#{sub_k}"
event.set(new_sub_field, sub_v)
end
}
event.remove(k)
#elsif v.is_a?(Hash) || v.is_a?(LogStash:Event)
# v.to_hash.each { |sub_k, sub_v|
# if sub_k.match(/^[A-Z][A-Za-z]*[A-Z](?:[A-Za-z0-9_]*)*$/) && sub_v.is_a?(String)
# new_sub_field = k.gsub(/(?<=\p{Lower})(?=\p{Upper})/, ".")
# event.set("[#{k}][#{new_sub_field}]",sub_v)
# event.remove("[#{k}][#{sub_k}]")
# end
# }
end
}
'
}
}