Need assistance on CEF _grokparsefailure

Hi Folk ,

I'm new on ELK since many days I have configure a pipeline for parsing CEF logs from netscout device . When I'm checking logs I see some _grokparsefailure tage on Kibana . I conclude that there is some issue in the parsing . can you please help me to find the issue ? . the grok parser is built to pars

Example of Log

Jul  8 23:47:31 trammell.tb.xxx.net CEF:0|NETSCOUT|Arbor Edge Defense|6.7.0.0|TLS Attack Prevention|Blocked Host|5|rt=1625787991000 src=6.94.137.215 dpt=5
5024 cn2=9 proto=TCP dst=34.4.133.112 spt=993 cs2Label=Protection Group Name cn2Label=Protection Group ID cs2=Default Protection Group

Logstash pipeline 

nput {

  udp {
    #codec => cef { delimiter => "\r\n"}
    port => 10514
    type => syslog
    tags => ["NETSCOUT-UDP"]
    codec => cef
  }
}

filter {
# Filter only CEF logs here
  if "NETSCOUT-UDP" in [tags]{
    # Manipulate the message
    mutate {
         # Saved the original message into a temporary field
         add_field => { "tmp_message" => "%{message}" }
         # splits message on the "|"  and has index numbers
         split => ["message", "|"]
         # SAMPLE
         # generate fields for the CEF header
         add_field => { "[cef][version]" => "%{[message][0]}" }
         add_field => { "[cef][device][vendor]" => "%{[message][1]}" }
         add_field => { "[cef][device][product]" => "%{[message][2]}" }
         add_field => { "[cef][device][version]" => "%{[message][3]}" }
         add_field => { "[cef][device][event_class_id]" => "%{[message][4]}" }
         add_field => { "[cef][name]" => "%{[message][5]}" }
         add_field => { "[cef][severity]" => "%{[message][6]}" }
         add_tag => [ "CEF-NETSCOUT-ARBOR" ]
    }
    # Parse the message with field=value formats for arbor

    kv {
        # Note: values with spaces are lost (still getting there)
         field_split => " "
         trim_key => "<>\[\], "
         trim_value => "<>\[\],"
         # Only included the fields which are of interest (dont need everything)
         allow_duplicate_values => false
         include_keys => ["deviceCustomString2","rt","sev","cs3Label","dstPort",
cs7Label","cs7","cs6","cs1","cs6Label","cs3","cs2"]

    }


    prune {
         whitelist_values => [ "match_id", "^[0-9]{3}$" ]
    }




    mutate {
        # Rename fields to cef_field_name
        rename => [ "src",    "[cef][source][geoip][ip]"]
        rename => [ "shost",    "[cef][source][host]"]
        rename => [ "dhost",    "[cef][destination][host]"]
        rename => [ "spt",    "[cef][source][port]"]
        rename => [ "dpt",    "[cef][destination][port]"]
        rename => [ "proto",  "[cef][network][transport]"]
        rename => [ "dst",    "[cef][destination][geoip][ip]"]
        rename => [ "rt",    "[cef][time]"]
        rename => [ "deviceEventClassId",     "[Attack_Categeory]"]
        rename => [ "deviceCustomNumber1",    "[Element_Id]"]
        rename => [ "deviceCustomNumber2",    "[Protection_group_ID]"]
        rename => [ "deviceCustomString1",    "[IOC_Pattern]"]
        rename => [ "deviceCustomString2",    "[Protection_Group_Name]"]
        rename => [ "deviceCustomString3",    "[Match_Type]"]
        rename => [ "deviceCustomString4",    "[TAXII_Collection_ID]"]
        rename => [ "deviceCustomString5",    "[TAXII_Collection_Title]"]
        rename => [ "deviceCustomString6",    "[Threat_Name]"]
        rename => [ "deviceCustomString7",    "[Threat_Category]"]
        rename => [ "name",   "[Alert_Type]"]



        rename => [ "syslog",   "[message_tmp2]"]
 # Rename fields to cef_field_name
        rename => [ "cs3",    "[Match_Type]"]
        rename => [ "cs4",    "[TAXII_Collection_ID"]
        rename => [ "cs5",    "[TAXII_Collection_Title]"]
        rename => [ "cs6",    "[Threat_Name]"]
        rename => [ "cs7",    "[Threat_Category]"]
        rename => [ "cs1",    "[IOC_Pattern]"]



        # Revert original message and remove temporary field
        replace => { "message" => "%{tmp_message}" }
        replace => { "syslog" => "%{{tmp_message2}" }
        remove_field => [ "tmp_message" ]
   }

    grok {
            match => { "message_tmp2" => "%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{SYSLOGHOST:hostname}" }
        }




   geoip {
        source => "[cef][source][geoip][ip]"
        target => "[cef][source][geoip][location]"
   }
   geoip {
        source => "[cef][destination][geoip][ip]"
        target => "[cef][destination][geoip][location]"
   }

   date {
      match => ["[cef][time]","UNIX_MS"]
      remove_field => [ "[cef][time]" ]
        }

  mutate {
       remove_field => [ "event_class_id", "device", "cef", "time", "day", "mess
][device][version]", "[cef][name]" ]


  # removed fields to cef_field_name

        remove_field => [ "cn1Label" ]
        remove_field => [ "cn2Label" ]
        remove_field => [ "deviceCustomString1Label" ]
        remove_field => [ "deviceCustomString2Label" ]
        remove_field => [ "deviceCustomString3Label" ]
        remove_field => [ "deviceCustomString4Label" ]
        remove_field => [ "deviceCustomString5Label" ]
        remove_field => [ "deviceCustomString6Label" ]
        remove_field => [ "deviceCustomString7Label" ]
        remove_field => [ "deviceCustomNumber1Label" ]
        remove_field => [ "deviceCustomNumber2Label" ]
        remove_field => [ "cs1Label" ]
        remove_field => [ "host" ]
        remove_field => [ "cs3Label" ]
        remove_field => [ "cs4Label" ]
        remove_field => [ "cs5Label" ]
        remove_field => [ "cs6Label" ]
        remove_field => [ "cs7Label" ]
        remove_field => [ "cs2Label" ]
        remove_field => [ "cs2Label" ]
        remove_field => [ "cn1Label" ]
        remove_field => [ "cn2Label" ]

          }



 }

}

## Output es01 es02 stack #

output {

if "NETSCOUT-UDP" in [tags]{

      elasticsearch {
                index => "cef-aed-v2-264"
                hosts => ["https://es-node-01:9200"]
                ssl => true
                ssl_certificate_verification => true
                cacert => "/etc/logstash/elasticsearch-ca.pem"
                manage_template => true
                user => "elastic"
                password  => 'xxxx'
                codec => "plain"
                manage_template => true
                template_name => "cef"
                     }
      #stdout { codec => rubydebug }

        }
}

Seems that your grok match statement is incomplete?

    grok {
            match => { "message_tmp2" => "%{MONTH:month} %{MONTHDAY:day} %{TIME:
        }

It would be better to use dissect instead and place all timestamp data into one field, i.e.:

dissect {
  mapping => {
    "message" => "%{+log_date/1->} %{+log_date/2} %{+log_date/3} %{message}"
  }

Hi Mad,
Here is the full grok match statement

    grok {
            match => { "message_tmp2" => "%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{SYSLOGHOST:hostname}" }
        }

I will try to use dissect . I also want to extract hostname of the device

Jul  8 23:47:31 trammell.tb.xxx.net

like this
Hostname = trammell.tb.xxx.net

Read a little bit about dissect filter because it's first choice when your logs format is always the same. To extract hostname just add it to mapping using correct delimiter, i.e.:

dissect {
  mapping => {
    "message" => "%{+log_date/1->} %{+log_date/2} %{+log_date/3} %{hostname} %{message}"
  }

Thanks for your help I'm trying

I have use this filter in order to resolve the issue with dissect


<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{Hostname}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.