Logstash tags entries as_grokparsefailures but multiple online debuggers says otherwise

Hi All, I've a very strange question that I hope to explain eloquently.

I'm dealing with FW logs. Already wrote a grok parser that works like a charm (prefer not to provided it here, let me know if it's going to be needed?) and everything is fine, I receive the alerts as I want them in my SIEM.

However when going through the log file /var/log/logstash/logstash-palin.log I can see some _grokparsefailures such as the below (I deliberately obfuscated some of the info)

************The log entry obfuscated*****************
+02:00 Info     XXX Remove: type=LIN|proto=UDP|srcIF=|srcIP=1.1.1.1|srcPort=691|srcMAC=FF:FF:FF:FF:FF:FF|dstIP=1.1.1.1|dstPort=691|dstService=XXX-MGMT|dstIF=p1.111|rule=OP-SRV-VPN|info=Unreachable Timeout|srcNAT=1.1.1.1|dstNAT=1.1.1.1|duration=-719825|count=1|receivedBytes=1255922857|sentBytes=41005632880|receivedPackets=7757015|sentPackets=37685863|user=|protocol=|application=|target=|content=|urlcat=

The odd thing is that when I use an online grok debugger such Grok Debugger or This One or This one the regex actually works and the log is being parsed exactly as I want. But for some reason this is still being tagged as _grokparsefailures in the log file and I can't see the log entries in my SIEM.

Now, one thing that comes to mind is that I have a cidr plugin in the logstash configuration that is checking if the source is external and if it is, it drops the entry.
In the dropped entries above the Source IP is indeed external (actually it is Company-owned, but I am checking against a particular private network and for it it's actually external). Having said that these entries should be dropped. So is it possible that by being dropped the log entries are actually flagged as _grokparsefailures? And why my parser fails since multiple online grok debuggers says otherwise?

Any response will be much appreciated. Thank you.

Yes with a code like this :

filter {
  if "_grokparsefailure" in [tags] {
    drop {}
  }
}

Without the grok pattern it is pretty hard to tell but did you try your pattern with a breakline at the end of the message in the grok debuggers ? Because in the error log, message contains it.

Cad

Another entry was tagged with _grokparsefailuresand it actually matches the grok filter???

+08:00 Security XXX Block: ???|UDP(17)|pvpn0|111:7c60:af8:5cff:efff:fffa:f691:76c|0|00:00:00:00:00:00|ff02::4d2d:5345:4152:4348:202a:2048|12112|||<no-match>|4003|(null)|(null)|0|3|0|0|0|0||||||

I wonder what the culprit to this might be?
@Cad any ideas now when I posted the regex?

Hi,

Your grok is pretty hard to read, i don't see where the error came from.
With the pattern of your data one other possibility to get informations without only using grok pattern but grok with gsub and csv plugins, i explain:

First you should do a grok to divided your data in 2 parts:

  • part1 :
    +08:00 Security XXX Block
  • part2:
    ???|UDP(17)|pvpn0|111:7c60:af8:5cff:efff:fffa:f691:76c|0|00:00:00:00:00:00|ff02::4d2d:5345:4152:4348:202a:2048|12112|||<no-match>|4003|(null)|(null)|0|3|0|0|0|0||||||

With a pattern like this :

match => {
  "message" => "^(?<Timezone>\+[\d]{2}:[\d]{2})\s+%{WORD:LogLevel}\s+(?<OriginSource>[a-zA-Z0-9]+)\s+%{WORD:Action}:\s+%{GREEDYDATA:Informations}$"
}

You have 5 fields Timezone, LogLevel, OriginSource, Action and Informations.

And Informations contains
???|UDP(17)|pvpn0|111:7c60:af8:5cff:efff:fffa:f691:76c|0|00:00:00:00:00:00|ff02::4d2d:5345:4152:4348:202a:2048|12112|||<no-match>|4003|(null)|(null)|0|3|0|0|0|0||||||
But Intormations could be
type=LIN|proto=UDP|srcIF=|srcIP=1.1.1.1|srcPort=691|srcMAC=FF:FF:FF:FF:FF:FF|dstIP=1.1.1.1|dstPort=691|dstService=XXX-MGMT|dstIF=p1.111|rule=OP-SRV-VPN|info=Unreachable Timeout|srcNAT=1.1.1.1|dstNAT=1.1.1.1|duration=-719825|count=1|receivedBytes=1255922857|sentBytes=41005632880|receivedPackets=7757015|sentPackets=37685863|user=|protocol=|application=|target=|content=|urlcat=

So Informations is a constant number of key-value with a pattern key=value. But unfortunatly you don't have a key everytime. So you should remove the key= with gsub filter, if it exist, and specify the key in the csv plugin.

The configuration should be like this :

filter {
  grok {
    match => {
      "message" => "^(?<Timezone>\+[\d]{2}:[\d]{2})\s+%{WORD:LogLevel}\s+(?<OriginSource>[a-zA-Z0-9]+)\s+%{WORD:Action}:\s+%{GREEDYDATA:Informations}$" 
    }
  }

  mutate {
    gsub {
      # Replace pipe and equals with something between with just a pipe (remove all keys)
      "Informations", "\|[.*]+=", "\|"
    }
  }

  csv {
    # Split on a pipe and put values in the columns
    source => "Informations"
    separator => "|"
    columns => [ "TrafficType", "L4Protocol", "SrcInterface", "SourceIP", "SourcePort", "SourceMAC", "DestinationIP", "DestinationPort", "DstService", "DstIF", "Rule", "Info", "SourceNAT", "DestinationNAT", "Duration", "Count", "ReceivedBytes", "SentBytes", "ReceivedPackets", "SentPackets", "User", "L7Protocol", "Application", "Target", "Content", "URLCategory" ]
  }
}

I hope my explanation is understandable. Tell me if it isn't.

Cad.

1 Like

Hi @Cad, this is a very good approach, never thought about it and I gave it a go but it seems the gsub does not working, ill try to narrow it down :), might also be cause some of the filed are empty, i.e. no value between ||, I included keep_empty_captures => true but no joy?

Otherwise, on my regex there are no errors, if you run the entry through it in an online grok debugger you'll see no errors, but for some reason in the Logstash log file, there were several entries which according to it didn't match as were tagged as _grokparsefailures

Thank you very much for your response again.

Hi,

Did you try with this gsub pattern "\|[a-zA-Z]+=" instead of this "\|[.*]+=". I think the one i give in my last answer is to inclusive.
The error can't came from || because the gsub pattern don't match it. You can try your gsub here if you want.

Can you please add stdout { codec => rubydebug } in the output part and show us the content of one event when you have a _grokpersefailure.

Cad.

1 Like

It's working @Cad, many thanks for showing me this method. it was a good exercise me building the regex but this is way more elegant approach as one does not need to deal with grok failures now and then because of missing a particular character in an entry :slight_smile:

1 Like

Hi again @Cad not sure what I did but now the events are displayed with two back slashes at the tail end

The Config I use now is:

input
{
        stdin { }
      
}

filter
{
        grok {
                keep_empty_captures => true
                match => { "message" => "^(?<Timezone>\+[\d]{2}:[\d]{2})\s+%{WORD:LogLevel}\s+(?<OriginSource>[a-zA-Z0-9]+)\s+%{WORD:Action}:\s+%{GREEDYDATA:Informations}$" }
             }

        mutate {
                gsub => [ "Informations", "\|?[a-zA-Z]+=", "\|" ]
               }

        csv {
                        source => "Informations"
                        separator => "|"
                        columns => [ "TrafficType", "L4Protocol", "SrcInterface", "SourceIP", "SourcePort", "SourceMAC", "DestinationIP", "DestinationPort", "DstService", "DstIF", "Rule", "Info", "SourceNAT", "DestinationNAT", "Duration", "Count", "ReceivedBytes", "SentBytes", "ReceivedPackets", "SentPackets", "User", "L7Protocol", "Application", "Target", "Content", "URLCategory" ]
            }
}
output
{
        stdout { codec => rubydebug }

Have you got any idea why the these dashes are there?

Hi,

It's my mistake. I think you have to change "\|" by "|" in the gsub option. It's a string so we don't need to escape the pipe.

Cad.

1 Like

Working now :slight_smile:

Just one final question. The empty captures used to be displayed as empty string "" now its says nil, is that the same?

It's because sometimes there is nothing between two pipes

ruby {
                code => "
                        hash = event.to_hash
                        hash.each do |k,v|
                                if v == nil
                                        event.set(k, "")
# remove it with event.remove(k)
                                end
                        end
                "
        }

Something like this should replace nil fields by an empty string.

Cad.

1 Like

Apologies for the belated response @Cad , thank you again :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.