Pfelk logstash data parsing

Hello would appreciate help with logstash parsing data into elastisearch. Please check my log output. Using opnsense syslog to logstash's pfelk addon and can not figure out whe right mutate filter to get this done. Thanks in advance!

[2023-09-18T19:00:11,393][WARN ][logstash.outputs.elasticsearch][pfelk][9a2fc31c5c61406e9cc6f8d6347eb69f1288022841ff17a7d978e31bc0fa30d3] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-pfelk-firewall", :routing=>nil}, {"data_stream"=>{"namespace"=>"firewall", "type"=>"logs", "dataset"=>"pfelk"}, "log"=>{"syslog"=>{"hostname"=>"xxx.dedyn.io", "appname"=>"firewall", "priority"=>134, "severity"=>{"code"=>6, "name"=>"Informational"}, "facility"=>{"code"=>16, "name"=>"local0"}, "procid"=>"56356", "version"=>"1"}}, "type"=>"firewall", "event"=>{"reason"=>"match", "created"=>2023-09-18T17:00:11.000Z, "original"=>"<134>1 2023-09-18T19:00:11+02:00 fw.slote-g.dedyn.io filterlog 56356 - [meta sequenceId=\"1985\"] 77,,,02f4bab031b57d1e30553ce08e0ec131,vtnet0,match,block,in,4,0x0,,1,23097,0,none,17,udp,204,10.10.10.110,239.255.255.250,57488,1900,184\n", "dataset"=>"pfelk.firewall", "action"=>"block", "sequence"=>"1985"}, "network"=>{"direction"=>"ingress", "iana_number"=>"17", "protocol"=>"udp", "type"=>"ipv4"}, "pf"=>{"ttl"=>"1", "rule"=>{"subid"=>""}, "tos"=>"0x0", "id"=>"23097", "flags"=>"none", "packet"=>{}, "ecn"=>"", "anchor"=>"", "offset"=>"0"}, "destination"=>{"ip"=>"239.255.255.250", "service"=>"ssdp", "port"=>"1900"}, "@version"=>"1", "rule"=>{"id"=>"77", "uuid"=>"02f4bab031b57d1e30553ce08e0ec131"}, "tags"=>["pfelk", "firewall", "IP_Private_Source", "IP_Private_Destination"], "service"=>{"type"=>"system"}, "host"=>{"ip"=>"10.10.0.253"}, "pf_csv"=>["77", "", "", "02f4bab031b57d1e30553ce08e0ec131", "vtnet0", "match", "block", "in", "4", "0x0", "", "1", "23097", "0", "none", "17", "udp", "204", "10.10.10.110", "239.255.255.250", "57488", "1900", "184\n"], "interface"=>{"name"=>"vtnet0"}, "@timestamp"=>2023-09-18T17:00:11.151607752Z, "source"=>{"ip"=>"10.10.10.110", "packets"=>"204", "bytes"=>"184\n", "port"=>"57488"}}], :response=>{"create"=>{"_index"=>".ds-logs-pfelk-firewall-2023.09.10-000001", "_id"=>"Mzk8qYoBZN3EEBNfOlt8", "status"=>400, "error"=>{"type"=>"document_parsing_exception", "reason"=>"[1:1478] failed to parse field [source.bytes] of type [long] in document with id 'Mzk8qYoBZN3EEBNfOlt8'. Preview of field's value: '184\n'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"For input string: \"184\n\""}}}}}

As you can see most of the logs are parsed successfully, but not all and I'm stuck for weeks now.

This is really an elasticsearch question, but I would guess that the elasticsearch parser for type [long] does not like the newline at the end of the field.

Looking at the value of [event][original] you could try

mutate { gsub => [ "message", "\n$", "" ] }

before you try to parse and remove the [message] field.

thanks for the quick reply, just a bit lost here. Are you aware about the pfelk conf files? Do not know where at the filter section exactly to add this line, tried 01-input.pfelk, 02-firewall.pfelk or 50-outputl. pfelk without any luck.

No, I have no idea what your configuration files look like.

@kozistan are you talking about this?

Hi Rios, yes exactly. Logstash pipelines are redirected to this one.

@Badger would wonder if you do :smiley:

I would say, 02-firewall.pfelk, after the line 144,
rename => { "[pf][data_length]" => "[source][bytes]" }

ok, this looks like fixed,

gsub => [ "[source][bytes]", "\n$", "" ] made it because

`rename => { "[pf][data_length]" => "[source][bytes]" } has been added there by default.

Now im facing another one "_id"=>"AWnVrIoBZN3EEBNfbPcp", "status"=>400, "error"=>{"type"=>"document_parsing_exception", "reason"=>"[1:1807] failed to parse field [pf.tcp.sequence_number] of type [long] in document with id 'AWnVrIoBZN3EEBNfbPcp'. Preview of field's value: '4123231383:4123231414'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"For input string: \"4123231383:4123231414\""}}}}}

Any suggestion?

this is the actual 02-firewall.pfelk:

# 02-firewall.pfelk
################################################################################
# Version: 23.09                                                               #
# Required: True                                                               #
# Description: Enriches pf (firewall) logs (OPNsense/pfSense)                  #
#                                                                              #
################################################################################
#
filter {
### filterlog ###
  if [log][syslog][appname] =~ /^filterlog$/ {
    mutate {
      add_tag => "firewall"
      add_field => { "[event][dataset]" => "pfelk.firewall" }
      replace => { "[log][syslog][appname]" => "firewall" }
      copy => { "filter_message" => "pf_csv" }
    }
    mutate {
      split => { "pf_csv" => "," }
    }
    # [Common Fields]
    # rule.id, pf.rule.subid, pf.anchor, rule.uuid, interface.name, event.reason, event.action, network.direction, network.type
    # [Not ECS compliant fields] pf.rule.subid, 
    mutate {
      add_field => {
        "[rule][id]" =>           "%{[pf_csv][0]}"
        "[pf][rule][subid]" =>    "%{[pf_csv][1]}"
        "[pf][anchor]" =>         "%{[pf_csv][2]}"
        "[rule][uuid]" =>         "%{[pf_csv][3]}"
        "[interface][name]" =>    "%{[pf_csv][4]}"
        "[event][reason]" =>      "%{[pf_csv][5]}"
        "[event][action]" =>      "%{[pf_csv][6]}"
        "[network][direction]" => "%{[pf_csv][7]}"
        "[network][type]" =>      "%{[pf_csv][8]}"
      }
    }
    # [IPv4]
    # [ECS compliant fields] network.iana_number, network.protocol, source.ip, destination.ip
    # [Not ECS compliant fields] pf.tos, pf.ecn, pf.ttl, pf.id, pf.offest, pf.flags, pf.packet.length
    if [network][type] == "4" {
      mutate {
      add_field => {
          "[pf][tos]" =>               "%{[pf_csv][9]}"
          "[pf][ecn]" =>               "%{[pf_csv][10]}"
          "[pf][ttl]" =>               "%{[pf_csv][11]}"
          "[pf][id]" =>                "%{[pf_csv][12]}"
          "[pf][offset]" =>            "%{[pf_csv][13]}"
          "[pf][flags]" =>             "%{[pf_csv][14]}"
          "[network][iana_number]" =>   "%{[pf_csv][15]}"
          "[network][protocol]" =>    "%{[pf_csv][16]}"
          "[pf][packet][length]" =>    "%{[pf_csv][17]}"
          "[source][ip]" =>            "%{[pf_csv][18]}"
          "[destination][ip]" =>       "%{[pf_csv][19]}"
        }
      }
      # [TCP]
      # [ECS compliant fields] source.port, destingation.port
      # [Not ECS compliant fields] pd.data_length, pf.tcp.flags, pf.tcp..sequence_number, pf.tcp..ack, pf.tcp..window, pf.tcp.urg, pf.tcp.options
      if [network][protocol] == "tcp" {
        mutate {
          add_field => {
            "[source][port]" =>                 "%{[pf_csv][20]}"
            "[destination][port]" =>            "%{[pf_csv][21]}"
            "[pf][data_length]" =>              "%{[pf_csv][22]}"
            "[pf][tcp][flags]" =>               "%{[pf_csv][23]}"
            "[pf][tcp][sequence_number]" =>     "%{[pf_csv][24]}"
            "[pf][tcp][ack]" =>                 "%{[pf_csv][25]}"
            "[pf][tcp][window]" =>              "%{[pf_csv][26]}"
            "[pf][tcp][urg]" =>                 "%{[pf_csv][27]}"
            "[pf][tcp][options]" =>             "%{[pf_csv][28]}"
          }
        }
      }
      # [UDP]
      # [ECS compliant fields] source.port, destination.port 
      # [Not ECS compliant fields] pf.data_length
      if [network][protocol] == "udp" {
        mutate {
          add_field => {
            "[source][port]" =>         "%{[pf_csv][20]}"
            "[destination][port]" =>    "%{[pf_csv][21]}"
            "[pf][data_length]" =>      "%{[pf_csv][22]}"
          }
        }
      }
    }
    # [IPv6]
    # [ECS compliant fields] network.iana_number, network.protocol, source.ip, destination.ip
    # [Not ECS compliant fields] pf.class, pf.flow, pf.hoplimit, pf.packet.length
    if [network][type] == "6" {
      mutate {
        add_field => {
            "[pf][class]" =>              "%{[pf_csv][9]}"
            "[pf][flow]" =>               "%{[pf_csv][10]}"
            "[pf][hoplimit]" =>           "%{[pf_csv][11]}"
            "[network][protocol]" =>      "%{[pf_csv][12]}"
            "[network][iana_number]" =>   "%{[pf_csv][13]}"
            "[pf][packet][length]" =>     "%{[pf_csv][14]}"
            "[source][ip]" =>             "%{[pf_csv][15]}"
            "[destination][ip]" =>        "%{[pf_csv][16]}"
        }
      }
      # [TCP]
      # [ECS compliant fields] source.port, destination.port
      # [Not ECS compliant fields] pf.data_length, pf.tcp.flags, pf.tcp..sequence_number, pf.tcp..ack, pf.tcp..window, pf.tcp.urg, pf.tcp.options
      if [network][protocol] == "tcp" {
        mutate {
          add_field => {
            "[source][port]" =>                 "%{[pf_csv][17]}"
            "[destination][port]" =>            "%{[pf_csv][18]}"
            "[pf][data_length]" =>              "%{[pf_csv][19]}"
            "[pf][tcp][flags]" =>               "%{[pf_csv][20]}"
            "[pf][tcp][sequence_number]" =>     "%{[pf_csv][21]}"
            "[pf][tcp][ack]" =>                 "%{[pf_csv][22]}"
            "[pf][tcp][window]" =>              "%{[pf_csv][23]}"
            "[pf][tcp][urg]" =>                 "%{[pf_csv][24]}"
            "[pf][tcp][options]" =>             "%{[pf_csv][25]}"
          }
        }
      }
      # [UDP]
      # [ECS compliant fields] source.port, destination.port
      # [Not ECS compliant fields] pf.data_length
      if [network][protocol] == "udp" {
        mutate {
          add_field => {
            "[source][port]" =>         "%{[pf_csv][17]}"
            "[destination][port]" =>    "%{[pf_csv][18]}"
            "[pf][data_length]" =>      "%{[pf_csv][19]}"
          }
        }
      }
    }
    # [ECS] Rename values/fields for ECS compliance
    if [network][direction] =~ /^out$/ {
      mutate {
        #add start
        gsub => [ "[destination][bytes]", "\n$", "" ]
        #add end
        rename => { "[pf][data_length]" => "[destination][bytes]" }
        rename => { "[pf][packet][length]" => "[destination][packets]" }
       }
    }
    if [network][direction] =~ /^in$/ {
      mutate {
        #add start
        gsub => [ "[source][bytes]", "\n$", "" ]
        #add end
        rename => { "[pf][data_length]" => "[source][bytes]" }
        rename => { "[pf][packet][length]" => "[source][packets]" }
     }
    }
    if [network][type] == "4" {
      mutate {
        update => { "[network][type]" => "ipv4" }
      }
    }
    if [network][type] == "6" {
      mutate {
        update => { "[network][type]" => "ipv6" }
      }
    }
    if [network][direction] =~ /^in$/ {
      mutate {
        update => { "[network][direction]" => "ingress" }
      }
    }
    if [network][type] =~ /^out$/ {
      mutate {
        update => { "[network][type]" => "egress" }
      }
    }
  }
}
1 Like

OK, i've tried to ask chatgtp about this one and here is the answer:

If modifying the Logstash configuration or updating the mapping in Elasticsearch did not resolve the issue, another possible approach is to transform the value of the pf.tcp.sequence_number field before indexing it into Elasticsearch.

You can use a Logstash filter, such as the ruby filter, to apply custom transformations to the field value. The ruby filter allows you to use Ruby code to manipulate the field value.

Here's an example of how you can modify your Logstash configuration to transform the pf.tcp.sequence_number field:

filter {
  ruby {
    code => '
      if event.get("[pf][tcp][sequence_number]")
        sequence_number = event.get("[pf][tcp][sequence_number]")
        if sequence_number.include?(":")
          start_num, end_num = sequence_number.split(":")
          event.set("[pf][tcp][sequence_number]", start_num.to_i)
          event.set("[pf][tcp][sequence_number_range_end]", end_num.to_i)
        else
          event.set("[pf][tcp][sequence_number]", sequence_number.to_i)
        end
      end
    '
  }
}

In this example, the ruby filter checks if the pf.tcp.sequence_number field exists. If it does, it checks if the field value contains a colon (:). If a colon is present, it assumes that the value represents a range, splits it into two numbers (start_num and end_num), and assigns the start and end numbers as separate fields (pf.tcp.sequence_number and pf.tcp.sequence_number_range_end). If there is no colon, it converts the value to an integer and assigns it to the pf.tcp.sequence_number field.

And yes, this was helpful and modifying 49-cleanup.pfelk make the thing, I'm now receiving the full parsed log into elasticsearch with no errors.

Anyway, thanks guys for your help, I really appreciate it!
This can be closed now.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.