Grok Parse failure but passing configtest


(Paul Mc Clure) #1

Hey first post here!

I'm getting a grok parse failure in my tags field:
tags netsyslog, _grokparsefailure

I'm really not sure where I've gone wrong though. The config seems fine and passes the configtest. Fields are indexed properly browsing in Kibana (besides the grokparsefailure in the tags field).

input {
  file {
    path => ["/var/log/network.log"]
    # sincedb_path => "/var/log/logstash/"
    start_position => "beginning"
    type => "syslog"
    tags => [ "netsyslog" ]
  }
  
  file {
    path => ["/var/log/threat.log"]
    # sincedb_path => "/var/log/logstash/"
    start_position => "beginning"
    type => "syslog"
    tags => [ "threats" ]
  }

} #end input



filter {
  if [type] == "syslog" {
    grok {
      #strips timestamp and host off of the front of the syslog message leaving the raw message generated by the syslog client and saves it as "raw_message"
      #patterns_dir => "/opt/logstash/patterns"
      match => [ "message", "%{TIMESTAMP_ISO8601:@timestamp} %{HOST:syslog_host} %{GREEDYDATA:raw_message}" ]
    }
  }



if "netsyslog" in [tags]
{

    csv
     {
      source => "raw_message"
      columns => [ "PaloAltoDomain","ReceiveTime","SerialNum","Type","Threat-ContentType","ConfigVersion","GenerateTime","SourceAddress","DestinationAddress","NATSourceIP","NATDestinationIP","Rule","SourcUser","DestinationUser","$
     }

     date
     {
      timezone => "America/Vancouver"
      match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]
     }
     #convert fields to proper format
     mutate
     {
      convert => [ "Bytes", "integer" ]
      convert => [ "BytesReceived", "integer" ]
      convert => [ "BytesSent", "integer" ]
      convert => [ "ElapsedTimeInSec", "integer" ]
      convert => [ "geoip.area_code", "integer" ]
      convert => [ "geoip.dma_code", "integer" ]
      convert => [ "geoip.latitude", "float" ]
      convert => [ "geoip.longitude", "float" ]
      convert => [ "NATDestinationPort", "integer" ]
      convert => [ "NATSourcePort", "integer" ]
      convert => [ "Packets", "integer" ]
      convert => [ "pkts_received", "integer" ]
      convert => [ "pkts_sent", "integer" ]
      convert => [ "seqno", "integer" ]
      gsub => [ "Rule", " ", "_",
                "Application", "( |-)", "_" ]
      remove_field => [ "message", "raw_message" ]
     } #end mutate

} #end netsyslog




else if "threats" in [tags]
{

    csv
    {
      source => "raw_message"
      columns => [ "PaloAltoDomain","ReceiveTime","SerialNum","Type","Threat-ContentType","ConfigVersion","GenerateTime","SourceAddress","DestinationAddress","NATSourceIP","NATDestinationIP","Rule","SourcUser","DestinationUser","$
    }

    date
    {
      timezone => "America/Vancouver"
      match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]
    }

    #convert fields to proper format
    mutate
    {
      convert => [ "geoip.area_code", "integer" ]
      convert => [ "geoip.dma_code", "integer" ]
      convert => [ "geoip.latitude", "float" ]
      convert => [ "geoip.longitude", "float" ]
      convert => [ "NATDestinationPort", "integer" ]
      convert => [ "NATSourcePort", "integer" ]
      convert => [ "seqno", "integer" ]
      gsub => [ "Rule", " ", "_",
                "Application", "( |-)", "_" ]
      remove_field => [ "message", "raw_message" ]
    }

} #end threats



#Geolocate logs that have SourceAddress and if that SourceAddress is a non-RFC1918 address
  if [SourceAddress] and [SourceAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "SourceAddress"
           target => "SourceGeo"
      }
      #Delete 0,0 in SourceGeo.location if equal to 0,0
      if ([SourceGeo.location] and [SourceGeo.location] =~ "0,0") {
        mutate {
          replace => [ "SourceGeo.location", "" ]
        }
      }
    }

  #Geolocate logs that have DestinationAddress and if that DestinationAddress is a non-RFC1918 address
  if [DestinationAddress] and [DestinationAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
      geoip {
           database => "/opt/logstash/GeoLiteCity.dat"
           source => "DestinationAddress"
           target => "DestinationGeo"
      }
      #Delete 0,0 in DestinationGeo.location if equal to 0,0
      if ([DestinationGeo.location] and [DestinationGeo.location] =~ "0,0") {
        mutate {
          replace => [ "DestinationAddress.location", "" ]
        }
      }
    }




} #end filter block




output {
  elasticsearch {
    protocol => "node"
    node_name => "logstash"
    cluster => "elasticsearch"
    host => "127.0.0.1"
    template => "/opt/logstash/elasticsearch-template.json"
    template_overwrite => true
  }
} #end output block

Can you get this failure from syntax problems? This is my first ELK stack so sorry for the noobness.


(Mark Walkom) #2

Try using http://grokdebug.herokuapp.com/ to check your grok syntax against a sample line.

Logstash will only make sure your grok matches the general syntax validation, it won't make sure things match what you expect.


(Magnus B├Ąck) #3

Please supply an example message.

match => [ "message", "%{TIMESTAMP_ISO8601:@timestamp} %{HOST:syslog_host} %{GREEDYDATA:raw_message}" ]

Don't grok directly into @timestamp. Let a date filter populate @timestamp.


(Paul Mc Clure) #4

Sorry for the delay I didn't have access to the computers yesterday,

I have two inputs coming into logstash. One tagged as netsyslog, the other as threat. They seem to be working fine for me in the grok debugger.

netsyslog example:

2015-07-09T09:44:03-07:00 192.168.157.205 domain-name1,2015/07/09 09:44:03,001801017645,TRAFFIC,end,1,2015/07/09 09:44:03,10.19.98.58,205.250.85.99,96.53.33.204,205.250.85.99,Guest-Untrust,,,web-browsing,vsys1,Trust,Untrust,ethernet1/2,ethernet1/6,Forward Syslog to ELK,2015/07/09 09:44:03,176465,1,56817,80,37133,80,0x42001c,tcp,allow,2143,710,1433,10,2015/07/09 09:43:48,1,any,0,432189386,0x0,10.0.0.0-10.255.255.255,CA,0,6,4,tcp-fin

threat example:

 2015-07-09T09:02:47-07:00 192.168.157.205 domain-name1,2015/07/09 09:02:47,001801017645,THREAT,vulnerability,1,2015/07/09 09:02:47,176.9.245.139,10.200.0.103,176.9.245.139,207.102.139.81,Trust-Untrust-Applications,,,web-browsing,vsys1,Untrust,Trust,ethernet1/1,ethernet1/2,Forward Syslog to ELK,2015/07/09 09:02:47,137472,1,80,60378,80,42715,0x424000,tcp,reset-both,"viewtopic.php",ANGLER Exploit Kit Detection(37796),any,critical,server-to-client,58123986,0x0,DE,10.0.0.0-10.255.255.255,0,,0,,,1,,,,,,,,0

(Jeremy Page) #5

The logstash config check just tests to see if the syntax is valid, grokparse failures are just saying it could not find any matches.

Similar to what Magnus posted, I'd change "%{TIMESTAMP_ISO8601:@timestamp}" to "%{TIMESTAMP_ISO8601:timestamp}" and add in a date filter:

       date {
        locale => en
        match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" , "yyyy-MM-dd HH:mm:ss,SSS" , "yyy-MM-dd HH:mm:ss,SSSZ" , "ISO8601" ]
      }

Also for simplification I'd take out the if statement since both your inputs should be syslog...


(Paul Mc Clure) #6

Thanks Jeremy,

I changed the timestamp and got rid of the if statemet like so:

filter {

grok {
  #strips timestamp and host off of the front of the syslog message leaving the raw message generated by the syslog client and saves it as "raw_message"
  #patterns_dir => "/opt/logstash/patterns"
  match => [ "message", "%{TIMESTAMP_ISO8601:timestamp} %{HOST:syslog_host} %{GREEDYDATA:raw_message}" ]
}

Also added the date filter within the csv block. Still getting the grokparsefailure in the tags section in Kibana though:

tags	  	netsyslog, _grokparsefailure

Any other ideas or is there something fundamentally wrong in my setup?

thanks


(system) #7