Ingest-convert.sh output incorrect for PaloAlto (pawn) Ingest Pipeline

Hi, I am planning to use Logstash instead of Filebeats for loading and parsing palo alto logs to Elastic.
I setup the Filebeats as a one-time to generate the Ingest Pipeline Definition using the "pawn" Filebeats module.

However, when i run the provided "ingest-convert.sh" tool to convert Ingest Pipeline JSONs to Logstash Configs, the resultant filter throws various errors like fields "not found" when the Logstash pipeline is run ...which seems obvious since the filter refers to "weird" fields which look like ingest pipeline processor specific fields Eg:- {{_ingest.timestamp}} which Logstash can't understand.

Is there any way i can convert the palo alto pawn module ingest pipeline definition to a working and valid Logstash filter?

Here is the filter that the "ingest-convert.sh" tool generated for pawn.

filter {
   mutate {
      add_field => {
         "[event][ingested]" => "{{_ingest.timestamp}}"
      }
   }
   mutate {
      rename => {
         "message" => "[log][original]"
      }
   }
   date {
      match => [
         "[_temp_][generated_time]",
         "yyyy/MM/dd HH:mm:ss"
      ]
   }
   date {
      match => [
         "[_temp_][generated_time]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      timezone => "{{ event.timezone }}"
   }
   date {
      match => [
         "[event][created]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      target => "[event][created]"
   }
   date {
      match => [
         "[event][created]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      target => "[event][created]"
      timezone => "{{ event.timezone }}"
   }
   date {
      match => [
         "[event][start]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      target => "[event][start]"
   }
   date {
      match => [
         "[event][start]",
         "yyyy/MM/dd HH:mm:ss"
      ]
      target => "[event][start]"
      timezone => "{{ event.timezone }}"
   }
   mutate {
      convert => {
         "[client][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[client][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[client][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[server][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[server][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[server][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[source][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[source][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[source][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[destination][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[destination][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[destination][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[network][bytes]" => "long"
      }
   }
   mutate {
      convert => {
         "[network][packets]" => "long"
      }
   }
   mutate {
      convert => {
         "[event][duration]" => "long"
      }
   }
   mutate {
      convert => {
         "[_temp_][labels]" => "long"
      }
   }
   mutate {
      convert => {
         "[panw][panos][sequence_number]" => "long"
      }
   }
   mutate {
      convert => {
         "[source][nat][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[destination][nat][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[client][nat][port]" => "long"
      }
   }
   mutate {
      convert => {
         "[server][nat][port]" => "long"
      }
   }
   
   mutate {
      add_field => {
         "[network][direction]" => "inbound"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "outbound"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "internal"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "external"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "unknown"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "inbound"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "outbound"
      }
   }
   mutate {
      add_field => {
         "[network][direction]" => "unknown"
      }
   }
   mutate {
      add_field => {
         "[network][type]" => "ipv4"
      }
   }
   mutate {
      add_field => {
         "[network][type]" => "ipv6"
      }
   }
   mutate {
      add_field => {
         "[event][kind]" => "event"
      }
   }
   mutate {
      add_field => {
         "[event][category]" => [
            "network_traffic",
            "network"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][kind]" => "alert"
      }
   }
   mutate {
      add_field => {
         "[event][category]" => [
            "security_threat",
            "intrusion_detection",
            "network"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][type]" => "allowed"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => "denied"
      }
   }
   mutate {
      add_field => {
         "[event][outcome]" => "success"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flow_started"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => [
            "start",
            "connection"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flow_terminated"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => [
            "end",
            "connection"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flow_dropped"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => [
            "denied",
            "connection"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flow_denied"
      }
   }
   mutate {
      add_field => {
         "[event][type]" => [
            "denied",
            "connection"
         ]
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "data_match"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "file_match"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "flood_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "packet_attack"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "scan_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "spyware_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "url_filtering"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "virus_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "exploit_detected"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "wildfire_verdict"
      }
   }
   mutate {
      add_field => {
         "[event][action]" => "wildfire_virus_detected"
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 1
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 2
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 3
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 4
      }
   }
   mutate {
      add_field => {
         "[event][severity]" => 5
      }
   }
   mutate {
      add_field => {
         "[panw][panos][action]" => "drop-icmp"
      }
   }
   mutate {
      add_field => {
         "[panw][panos][action]" => "reset-both"
      }
   }
   mutate {
      add_field => {
         "[panw][panos][action]" => "reset-client"
      }
   }
   mutate {
      add_field => {
         "[panw][panos][action]" => "reset-server"
      }
   }
   mutate {
      add_field => {
         "[related][ip]" => [
            "{{source.ip}}"
         ]
      }
   }
   mutate {
      add_field => {
         "[related][ip]" => [
            "{{destination.ip}}"
         ]
      }
   }
   mutate {
      add_field => {
         "[related][ip]" => [
            "{{source.nat.ip}}"
         ]
      }
   }
   mutate {
      add_field => {
         "[related][ip]" => [
            "{{destination.nat.ip}}"
         ]
      }
   }
   geoip {
      source => "[source][ip]"
      target => "[source][geo]"
   }
   geoip {
      source => "[destination][ip]"
      target => "[destination][geo]"
   }
   geoip {
      source => "[source][ip]"
      target => "[source][as]"
      fields => [
         "asn",
         "organization_name"
      ]
   }
   geoip {
      source => "[destination][ip]"
      target => "[destination][as]"
      fields => [
         "asn",
         "organization_name"
      ]
   }
   mutate {
      rename => {
         "[source][as][asn]" => "[source][as][number]"
      }
   }
   mutate {
      rename => {
         "[source][as][organization_name]" => "[source][as][organization][name]"
      }
   }
   mutate {
      rename => {
         "[destination][as][asn]" => "[destination][as][number]"
      }
   }
   mutate {
      rename => {
         "[destination][as][organization_name]" => "[destination][as][organization][name]"
      }
   }
   mutate {
      rename => {
         "[_temp_][srcloc]" => "[source][geo][name]"
      }
   }
   mutate {
      rename => {
         "[_temp_][dstloc]" => "[destination][geo][name]"
      }
   }
   mutate {
      add_field => {
         "[network][community_id]" => [
            "{{panw.panos.network.nat.community_id}}"
         ]
      }
   }
   grok {
      match => {
         "panw.panos.threat.name" => "%{GREEDYDATA:[panw][panos][threat][name]}\(\s*%{GREEDYDATA:[panw][panos][threat][id]}\s*\)"
      }
   }
   mutate {
      add_field => {
         "[panw][panos][threat][name]" => "URL-filtering"
      }
   }
   mutate {
      add_field => {
         "[rule][name]" => "{{panw.panos.ruleset}}"
      }
   }
   mutate {
      rename => {
         "[url][original]" => "[file][name]"
      }
   }
   grok {
      match => {
         "url.original" => "(%{ANY:[url][scheme]}\:\/\/)?(%{USERNAME:[url][username]}(\:%{PASSWORD:[url][password]})?\@)?%{DOMAIN:[url][domain]}(\:%{POSINT:[url][port]})?(%{PATH:[url][path]})?(\?%{QUERY:[url][query]})?(\#%{ANY:[url][fragment]})?"
      }
      pattern_definitions => {
         "PASSWORD" => "[^@]*"
         "DOMAIN" => "[^\/\?#\:]*"
         "PATH" => "[^\?#]*"
         "QUERY" => "[^#]*"
         "ANY" => ".*"
         "USERNAME" => "[^\:]*"
      }
   }
   grok {
      match => {
         "url.path" => "%{FILENAME}((?:\][%{ANY})*(\][%{ANY:[url][extension]}))?"
      }
      pattern_definitions => {
         "FILENAME" => "[^\.]+"
         "ANY" => ".*"
      }
   }
   grok {
      match => {
         "file.name" => "%{FILENAME}((?:\][%{ANY})*(\][%{ANY:[file][extension]}))?"
      }
      pattern_definitions => {
         "ANY" => ".*"
         "FILENAME" => "[^\.]+"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{client.user.name}}"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{source.user.name}}"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{server.user.name}}"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{destination.user.name}}"
      }
   }
   mutate {
      add_field => {
         "[related][user]" => "{{url.username}}"
      }
   }
   mutate {
      add_field => {
         "[related][hash]" => "{{panw.panos.file.hash}}"
      }
   }
   mutate {
      add_field => {
         "[related][hosts]" => "{{observer.hostname}}"
      }
   }
   mutate {
      add_field => {
         "[related][hosts]" => "{{url.domain}}"
      }
   }
}

...based on further experiments and looking at the beats module source code, seems like the Filebeat pawn module does the initial parsing using the CSV processor and the the Ingest pipeline does "post-processing" and some transforms into the final ECS schema that shows up in Kibana.

So fields like " temp" etc are injected by the file beats module to be used by the ingest pipeline.

So now my question is, is there a way or if someone has already done this for Logstash and can share their config?

I.e the entire filebeat pawn module + ingest pipeline processing for palo alto logs done in logstash?