Ingest-convert.sh output incorrect for PaloAlto (pawn) Ingest Pipeline

Hi, I am planning to use Logstash instead of Filebeats for loading and parsing palo alto logs to Elastic.
I setup the Filebeats as a one-time to generate the Ingest Pipeline Definition using the "pawn" Filebeats module.

However, when i run the provided "ingest-convert.sh" tool to convert Ingest Pipeline JSONs to Logstash Configs, the resultant filter throws various errors like fields "not found" when the Logstash pipeline is run ...which seems obvious since the filter refers to "weird" fields which look like ingest pipeline processor specific fields Eg:- {{_ingest.timestamp}} which Logstash can't understand.

Is there any way i can convert the palo alto pawn module ingest pipeline definition to a working and valid Logstash filter?

Here is the filter that the "ingest-convert.sh" tool generated for pawn.

filter {
   mutate {
      add_field => {
         "[event][ingested]" => "{{_ingest.timestamp}}"
      }
   }
   mutate {
      rename => {
         "message" => "[log][original]"
      }
   }
   date {
      match => [
         "[_temp_][generated_time]",
         "yyyy/MM/dd HH:mm:ss"
      ]
   }
   date {
      match => [
         "[_temp_][generated_time]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      timezone => "{{ event.timezone }}"
   }
   date {
      match => [
         "[event][created]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      target => "[event][created]"
   }
   date {
      match => [
         "[event][created]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      target => "[event][created]"
      timezone => "{{ event.timezone }}"
   }
   date {
      match => [
         "[event][start]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      target => "[event][start]"
   }
   date {
      match => [
         "[event][start]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      target => "[event][start]"
      timezone => "{{ event.timezone }}"
   }
   mutate {
      convert => {
         "[client][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[client][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[client][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[server][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[server][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[server][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[source][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[source][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[source][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[destination][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[destination][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[destination][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[network][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[network][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[event][duration]" => "long"
      }
   }
   mutate {
      convert => {
         "[_temp_][labels]" => "long"
      }
   }
   mutate {
      convert => {
         "[panw][panos][sequence_number]" => "long"
      }
   }
   mutate {
      convert => {
         "[source][nat][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[destination][nat][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[client][nat][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[server][nat][port]" => "long"
      }
   }
   
   mutate {
      add_field => {
         "[network][direction]" => "inbound"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "outbound"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "internal"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "external"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "unknown"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "inbound"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "outbound"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "unknown"
      }
   }
   mutate {
      add_field => {
         "[network][type]" => "ipv4"
      }
   }
   mutate {
      add_field => {
         "[network][type]" => "ipv6"
      }
   }
   mutate {
      add_field => {
         "[event][kind]" => "event"
      }
   }
   mutate {
      add_field => {
         "[event][category]" => [
            "network_traffic",
            "network"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][kind]" => "alert"
      }
   }
   mutate {
      add_field => {
         "[event][category]" => [
            "security_threat",
            "intrusion_detection",
            "network"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][type]" => "allowed"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => "denied"
      }
   }
   mutate {
      add_field => {
         "[event][outcome]" => "success"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flow_started"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => [
            "start",
            "connection"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flow_terminated"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => [
            "end",
            "connection"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flow_dropped"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => [
            "denied",
            "connection"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flow_denied"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => [
            "denied",
            "connection"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "data_match"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "file_match"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flood_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "packet_attack"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "scan_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "spyware_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "url_filtering"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "virus_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "exploit_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "wildfire_verdict"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "wildfire_virus_detected"
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 1
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 2
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 3
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 4
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 5
      }
   }
   mutate {
      add_field => {
         "[panw][panos][action]" => "drop-icmp"
      }
   }
   mutate {
      add_field => {
         "[panw][panos][action]" => "reset-both"
      }
   }
   mutate {
      add_field => {
         "[panw][panos][action]" => "reset-client"
      }
   }
   mutate {
      add_field => {
         "[panw][panos][action]" => "reset-server"
      }
   }
   mutate {
      add_field => {
         "[related][ip]" => [
            "{{source.ip}}"
         ]
      }
   }
   mutate {
      add_field => {
         "[related][ip]" => [
            "{{destination.ip}}"
         ]
      }
   }
   mutate {
      add_field => {
         "[related][ip]" => [
            "{{source.nat.ip}}"
         ]
      }
   }
   mutate {
      add_field => {
         "[related][ip]" => [
            "{{destination.nat.ip}}"
         ]
      }
   }
   geoip {
      source => "[source][ip]"
      target => "[source][geo]"
   }
   geoip {
      source => "[destination][ip]"
      target => "[destination][geo]"
   }
   geoip {
      source => "[source][ip]"
      target => "[source][as]"
      fields => [
         "asn",
         "organization_name"
      ]
   }
   geoip {
      source => "[destination][ip]"
      target => "[destination][as]"
      fields => [
         "asn",
         "organization_name"
      ]
   }
   mutate {
      rename => {
         "[source][as][asn]" => "[source][as][number]"
      }
   }
   mutate {
      rename => {
         "[source][as][organization_name]" => "[source][as][organization][name]"
      }
   }
   mutate {
      rename => {
         "[destination][as][asn]" => "[destination][as][number]"
      }
   }
   mutate {
      rename => {
         "[destination][as][organization_name]" => "[destination][as][organization][name]"
      }
   }
   mutate {
      rename => {
         "[_temp_][srcloc]" => "[source][geo][name]"
      }
   }
   mutate {
      rename => {
         "[_temp_][dstloc]" => "[destination][geo][name]"
      }
   }
   mutate {
      add_field => {
         "[network][community_id]" => [
            "{{panw.panos.network.nat.community_id}}"
         ]
      }
   }
   grok {
      match => {
         "panw.panos.threat.name" => "%{GREEDYDATA:[panw][panos][threat][name]}\(\s*%{GREEDYDATA:[panw][panos][threat][id]}\s*\)"
      }
   }
   mutate {
      add_field => {
         "[panw][panos][threat][name]" => "URL-filtering"
      }
   }
   mutate {
      add_field => {
         "[rule][name]" => "{{panw.panos.ruleset}}"
      }
   }
   mutate {
      rename => {
         "[url][original]" => "[file][name]"
      }
   }
   grok {
      match => {
         "url.original" => "(%{ANY:[url][scheme]}\:\/\/)?(%{USERNAME:[url][username]}(\:%{PASSWORD:[url][password]})?\@)?%{DOMAIN:[url][domain]}(\:%{POSINT:[url][port]})?(%{PATH:[url][path]})?(\?%{QUERY:[url][query]})?(\#%{ANY:[url][fragment]})?"
      }
      pattern_definitions => {
         "PASSWORD" => "[^@]*"
         "DOMAIN" => "[^\/\?#\:]*"
         "PATH" => "[^\?#]*"
         "QUERY" => "[^#]*"
         "ANY" => ".*"
         "USERNAME" => "[^\:]*"
      }
   }
   grok {
      match => {
         "url.path" => "%{FILENAME}((?:\][%{ANY})*(\][%{ANY:[url][extension]}))?"
      }
      pattern_definitions => {
         "FILENAME" => "[^\.]+"
         "ANY" => ".*"
      }
   }
   grok {
      match => {
         "file.name" => "%{FILENAME}((?:\][%{ANY})*(\][%{ANY:[file][extension]}))?"
      }
      pattern_definitions => {
         "ANY" => ".*"
         "FILENAME" => "[^\.]+"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{client.user.name}}"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{source.user.name}}"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{server.user.name}}"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{destination.user.name}}"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{url.username}}"
      }
   }
   mutate {
      add_field => {
         "[related][hash]" => "{{panw.panos.file.hash}}"
      }
   }
   mutate {
      add_field => {
         "[related][hosts]" => "{{observer.hostname}}"
      }
   }
   mutate {
      add_field => {
         "[related][hosts]" => "{{url.domain}}"
      }
   }
}

...based on further experiments and looking at the beats module source code, seems like the Filebeat pawn module does the initial parsing using the CSV processor and the the Ingest pipeline does "post-processing" and some transforms into the final ECS schema that shows up in Kibana.

So fields like " temp" etc are injected by the file beats module to be used by the ingest pipeline.

So now my question is, is there a way or if someone has already done this for Logstash and can share their config?

I.e the entire filebeat pawn module + ingest pipeline processing for palo alto logs done in logstash?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.