Hi Folk ,
I'm new on ELK since many days I have configure a pipeline for parsing CEF logs from netscout device . When I'm checking logs I see some _grokparsefailure tage on Kibana . I conclude that there is some issue in the parsing . can you please help me to find the issue ? . the grok parser is built to pars
Example of Log
Jul 8 23:47:31 trammell.tb.xxx.net CEF:0|NETSCOUT|Arbor Edge Defense|6.7.0.0|TLS Attack Prevention|Blocked Host|5|rt=1625787991000 src=6.94.137.215 dpt=5
5024 cn2=9 proto=TCP dst=34.4.133.112 spt=993 cs2Label=Protection Group Name cn2Label=Protection Group ID cs2=Default Protection Group
Logstash pipeline
nput {
udp {
#codec => cef { delimiter => "\r\n"}
port => 10514
type => syslog
tags => ["NETSCOUT-UDP"]
codec => cef
}
}
filter {
# Filter only CEF logs here
if "NETSCOUT-UDP" in [tags]{
# Manipulate the message
mutate {
# Saved the original message into a temporary field
add_field => { "tmp_message" => "%{message}" }
# splits message on the "|" and has index numbers
split => ["message", "|"]
# SAMPLE
# generate fields for the CEF header
add_field => { "[cef][version]" => "%{[message][0]}" }
add_field => { "[cef][device][vendor]" => "%{[message][1]}" }
add_field => { "[cef][device][product]" => "%{[message][2]}" }
add_field => { "[cef][device][version]" => "%{[message][3]}" }
add_field => { "[cef][device][event_class_id]" => "%{[message][4]}" }
add_field => { "[cef][name]" => "%{[message][5]}" }
add_field => { "[cef][severity]" => "%{[message][6]}" }
add_tag => [ "CEF-NETSCOUT-ARBOR" ]
}
# Parse the message with field=value formats for arbor
kv {
# Note: values with spaces are lost (still getting there)
field_split => " "
trim_key => "<>\[\], "
trim_value => "<>\[\],"
# Only included the fields which are of interest (dont need everything)
allow_duplicate_values => false
include_keys => ["deviceCustomString2","rt","sev","cs3Label","dstPort",
cs7Label","cs7","cs6","cs1","cs6Label","cs3","cs2"]
}
prune {
whitelist_values => [ "match_id", "^[0-9]{3}$" ]
}
mutate {
# Rename fields to cef_field_name
rename => [ "src", "[cef][source][geoip][ip]"]
rename => [ "shost", "[cef][source][host]"]
rename => [ "dhost", "[cef][destination][host]"]
rename => [ "spt", "[cef][source][port]"]
rename => [ "dpt", "[cef][destination][port]"]
rename => [ "proto", "[cef][network][transport]"]
rename => [ "dst", "[cef][destination][geoip][ip]"]
rename => [ "rt", "[cef][time]"]
rename => [ "deviceEventClassId", "[Attack_Categeory]"]
rename => [ "deviceCustomNumber1", "[Element_Id]"]
rename => [ "deviceCustomNumber2", "[Protection_group_ID]"]
rename => [ "deviceCustomString1", "[IOC_Pattern]"]
rename => [ "deviceCustomString2", "[Protection_Group_Name]"]
rename => [ "deviceCustomString3", "[Match_Type]"]
rename => [ "deviceCustomString4", "[TAXII_Collection_ID]"]
rename => [ "deviceCustomString5", "[TAXII_Collection_Title]"]
rename => [ "deviceCustomString6", "[Threat_Name]"]
rename => [ "deviceCustomString7", "[Threat_Category]"]
rename => [ "name", "[Alert_Type]"]
rename => [ "syslog", "[message_tmp2]"]
# Rename fields to cef_field_name
rename => [ "cs3", "[Match_Type]"]
rename => [ "cs4", "[TAXII_Collection_ID"]
rename => [ "cs5", "[TAXII_Collection_Title]"]
rename => [ "cs6", "[Threat_Name]"]
rename => [ "cs7", "[Threat_Category]"]
rename => [ "cs1", "[IOC_Pattern]"]
# Revert original message and remove temporary field
replace => { "message" => "%{tmp_message}" }
replace => { "syslog" => "%{{tmp_message2}" }
remove_field => [ "tmp_message" ]
}
grok {
match => { "message_tmp2" => "%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{SYSLOGHOST:hostname}" }
}
geoip {
source => "[cef][source][geoip][ip]"
target => "[cef][source][geoip][location]"
}
geoip {
source => "[cef][destination][geoip][ip]"
target => "[cef][destination][geoip][location]"
}
date {
match => ["[cef][time]","UNIX_MS"]
remove_field => [ "[cef][time]" ]
}
mutate {
remove_field => [ "event_class_id", "device", "cef", "time", "day", "mess
][device][version]", "[cef][name]" ]
# removed fields to cef_field_name
remove_field => [ "cn1Label" ]
remove_field => [ "cn2Label" ]
remove_field => [ "deviceCustomString1Label" ]
remove_field => [ "deviceCustomString2Label" ]
remove_field => [ "deviceCustomString3Label" ]
remove_field => [ "deviceCustomString4Label" ]
remove_field => [ "deviceCustomString5Label" ]
remove_field => [ "deviceCustomString6Label" ]
remove_field => [ "deviceCustomString7Label" ]
remove_field => [ "deviceCustomNumber1Label" ]
remove_field => [ "deviceCustomNumber2Label" ]
remove_field => [ "cs1Label" ]
remove_field => [ "host" ]
remove_field => [ "cs3Label" ]
remove_field => [ "cs4Label" ]
remove_field => [ "cs5Label" ]
remove_field => [ "cs6Label" ]
remove_field => [ "cs7Label" ]
remove_field => [ "cs2Label" ]
remove_field => [ "cs2Label" ]
remove_field => [ "cn1Label" ]
remove_field => [ "cn2Label" ]
}
}
}
## Output es01 es02 stack #
output {
if "NETSCOUT-UDP" in [tags]{
elasticsearch {
index => "cef-aed-v2-264"
hosts => ["https://es-node-01:9200"]
ssl => true
ssl_certificate_verification => true
cacert => "/etc/logstash/elasticsearch-ca.pem"
manage_template => true
user => "elastic"
password => 'xxxx'
codec => "plain"
manage_template => true
template_name => "cef"
}
#stdout { codec => rubydebug }
}
}