Help pruning fields with "%{[foo][bar]}" values

Hi there!

I'm currently testing the latest PFelk with my firewall and I found a very specific situation which has only happened with a couple of log lines, but I would like to make it work. The problem is that due to some processing (not using grok), certain fields that exist in these log lines don't have a value and therefore the assigned field returns as: [pf][tcp][options] = "%{[pfelk_csv][28]}".

I've tried using the prune filter with all kinds of regex to blacklist values of that form, even hardcoding it for the specific value, but nothing. I'm probably doing something wrong with the regex or I simply don't understand the method.

Ideally, I could add a default value when doing the add_field for [pf][tcp][options]; but in general I would like to avoid using something like 'if [pf][tcp][options] =~ "^%{[+}$"', mostly because there are 22 [pf] fields...

Some more context:

The part of pfekl that manages these logs is "02-firewall.pfelk" and processes the log field by field instead of simply using grok.

filter {
### filterlog ###
  if [log][syslog][appname] =~ /^filterlog$/ {
    .......
    mutate {
      split => { "pfelk_csv" => "," }
    }
    ...........
     if [network][protocol] == "tcp" {
        mutate {
          add_field => {
            .......
            "[pf][tcp][window]" =>		"%{[pfelk_csv][26]}"
            "[pf][tcp][urg]" =>			"%{[pfelk_csv][27]}"
            "[pf][tcp][options]" =>	        "%{[pfelk_csv][28]}"
          }
        }
      }
}

The input log looks like this:

<134>May 14 04:41:52 test.host filterlog: 5,,,1000000103,lagg1.232,match,block,in,4,0x0,,241,56358,0,none,6,tcp,40,8.8.8.8,9.9.7.7,43620,51497,0,S,3713913524,,1024,,

This is your entire pipeline?

They way you are parsing it is far from ideal, it seems that you are using the mutate split to create an array and then the rename to rename the fields based on the array index.

A better way would be to use a dissect to remove the csv part of your message, it is not clear if you are already doing it because you didn't share the full pipeline, and then use a csv filter with the skip columns option as true.

something like this:

dissect {
      mapping => {
            "pfelk_csv" => "%{}filterlog: %{csv_fields}"
      }
}

csv {
      source => "csv_fields"
      separator => ","
      skip_empty_columns => true
      columns => ["[field][name1]","[field][name2]","[field][name1]","[field][name3]",...,"[field][nameN]"]
      remove_field => ["csv_fields"]
}

This way if a column is empty in your csv, the destination field for it will not be populated.

1 Like

That will not work. According to the documentation "This filter currently only support operations on top-level fields, i.e. whitelisting and blacklisting of subfields based on name or value does not work."

UUh this is a great improvement. I didn't share the whole pipeline, because it is not mine, and I wanted to focus on the problem. However, feel free to take a look: pfelk/etc/pfelk/conf.d/02-firewall.pfelk at main · pfelk/pfelk · GitHub

I'm going to try your improvement, see if it can work here, and open a pull request!

AH, ok, so it was an understanding problem.

Yeah. looking into that repository, it seems that the csv message is already isolated in a field called filter_message that is copied into pfelk_csv.

So you would need to the csv fitler on this field, naming all the columns.

But, in the same pipeline, some fields will have different names depending on the value of other fields, so the csv filter should use a placeholder name for those fields and then you rename then according to the conditionals.

For example:

    if [network][type] == "4" {
      mutate {
      add_field => {
          "[pf][tos]" =>	       "%{[pfelk_csv][9]}"
          "[pf][ecn]" =>	       "%{[pfelk_csv][10]}"
          "[pf][ttl]" =>	       "%{[pfelk_csv][11]}"
          "[pf][id]" =>		       "%{[pfelk_csv][12]}"
          "[pf][offset]" =>	       "%{[pfelk_csv][13]}"
          "[pf][flags]" =>	       "%{[pfelk_csv][14]}"
          "[network][iana_number]" =>   "%{[pfelk_csv][15]}"
          "[network][protocol]" =>    "%{[pfelk_csv][16]}"
          "[pf][packet][length]" =>    "%{[pfelk_csv][17]}"
          "[source][ip]" =>	       "%{[pfelk_csv][18]}"
          "[destination][ip]" =>       "%{[pfelk_csv][19]}"
        }
      }

and

    if [network][type] == "6" {
      mutate {
        add_field => {
            "[pf][class]" =>              "%{[pfelk_csv][9]}"
            "[pf][flow]" =>		  "%{[pfelk_csv][10]}"
            "[pf][hoplimit]" =>		  "%{[pfelk_csv][11]}"
            "[network][protocol]" =>      "%{[pfelk_csv][12]}"
            "[network][iana_number]" =>	  "%{[pfelk_csv][13]}"
            "[pf][packet][length]" =>	  "%{[pfelk_csv][14]}"
            "[source][ip]" =>		  "%{[pfelk_csv][15]}"
            "[destination][ip]" =>	  "%{[pfelk_csv][16]}"
        }
      }

There other cases like this in the same pipeline, so you would need to change the conditionals.

Another option is to not name the columns, so they will be saved as columnNUMBER, like column1, column15 etc, then you rename them as needed.

Yet another option is to split the csv into an array, then pop fields from it according to the conditionals. A demonstration of the idea can be found here.