Hi, I am planning to use Logstash instead of Filebeats for loading and parsing palo alto logs to Elastic.
I setup the Filebeats as a one-time to generate the Ingest Pipeline Definition using the "pawn" Filebeats module.
However, when i run the provided "ingest-convert.sh" tool to convert Ingest Pipeline JSONs to Logstash Configs, the resultant filter throws various errors like fields "not found" when the Logstash pipeline is run ...which seems obvious since the filter refers to "weird" fields which look like ingest pipeline processor specific fields Eg:- {{_ingest.timestamp}} which Logstash can't understand.
Is there any way i can convert the palo alto pawn module ingest pipeline definition to a working and valid Logstash filter?
Here is the filter that the "ingest-convert.sh" tool generated for pawn.
filter {
mutate {
add_field => {
"[event][ingested]" => "{{_ingest.timestamp}}"
}
}
mutate {
rename => {
"message" => "[log][original]"
}
}
date {
match => [
"[_temp_][generated_time]",
"yyyy/MM/dd HH:mm:ss"
]
}
date {
match => [
"[_temp_][generated_time]",
"yyyy/MM/dd HH:mm:ss"
]
timezone => "{{ event.timezone }}"
}
date {
match => [
"[event][created]",
"yyyy/MM/dd HH:mm:ss"
]
target => "[event][created]"
}
date {
match => [
"[event][created]",
"yyyy/MM/dd HH:mm:ss"
]
target => "[event][created]"
timezone => "{{ event.timezone }}"
}
date {
match => [
"[event][start]",
"yyyy/MM/dd HH:mm:ss"
]
target => "[event][start]"
}
date {
match => [
"[event][start]",
"yyyy/MM/dd HH:mm:ss"
]
target => "[event][start]"
timezone => "{{ event.timezone }}"
}
mutate {
convert => {
"[client][bytes]" => "long"
}
}
mutate {
convert => {
"[client][packets]" => "long"
}
}
mutate {
convert => {
"[client][port]" => "long"
}
}
mutate {
convert => {
"[server][bytes]" => "long"
}
}
mutate {
convert => {
"[server][packets]" => "long"
}
}
mutate {
convert => {
"[server][port]" => "long"
}
}
mutate {
convert => {
"[source][bytes]" => "long"
}
}
mutate {
convert => {
"[source][packets]" => "long"
}
}
mutate {
convert => {
"[source][port]" => "long"
}
}
mutate {
convert => {
"[destination][bytes]" => "long"
}
}
mutate {
convert => {
"[destination][packets]" => "long"
}
}
mutate {
convert => {
"[destination][port]" => "long"
}
}
mutate {
convert => {
"[network][bytes]" => "long"
}
}
mutate {
convert => {
"[network][packets]" => "long"
}
}
mutate {
convert => {
"[event][duration]" => "long"
}
}
mutate {
convert => {
"[_temp_][labels]" => "long"
}
}
mutate {
convert => {
"[panw][panos][sequence_number]" => "long"
}
}
mutate {
convert => {
"[source][nat][port]" => "long"
}
}
mutate {
convert => {
"[destination][nat][port]" => "long"
}
}
mutate {
convert => {
"[client][nat][port]" => "long"
}
}
mutate {
convert => {
"[server][nat][port]" => "long"
}
}
mutate {
add_field => {
"[network][direction]" => "inbound"
}
}
mutate {
add_field => {
"[network][direction]" => "outbound"
}
}
mutate {
add_field => {
"[network][direction]" => "internal"
}
}
mutate {
add_field => {
"[network][direction]" => "external"
}
}
mutate {
add_field => {
"[network][direction]" => "unknown"
}
}
mutate {
add_field => {
"[network][direction]" => "inbound"
}
}
mutate {
add_field => {
"[network][direction]" => "outbound"
}
}
mutate {
add_field => {
"[network][direction]" => "unknown"
}
}
mutate {
add_field => {
"[network][type]" => "ipv4"
}
}
mutate {
add_field => {
"[network][type]" => "ipv6"
}
}
mutate {
add_field => {
"[event][kind]" => "event"
}
}
mutate {
add_field => {
"[event][category]" => [
"network_traffic",
"network"
]
}
}
mutate {
add_field => {
"[event][kind]" => "alert"
}
}
mutate {
add_field => {
"[event][category]" => [
"security_threat",
"intrusion_detection",
"network"
]
}
}
mutate {
add_field => {
"[event][type]" => "allowed"
}
}
mutate {
add_field => {
"[event][type]" => "denied"
}
}
mutate {
add_field => {
"[event][outcome]" => "success"
}
}
mutate {
add_field => {
"[event][action]" => "flow_started"
}
}
mutate {
add_field => {
"[event][type]" => [
"start",
"connection"
]
}
}
mutate {
add_field => {
"[event][action]" => "flow_terminated"
}
}
mutate {
add_field => {
"[event][type]" => [
"end",
"connection"
]
}
}
mutate {
add_field => {
"[event][action]" => "flow_dropped"
}
}
mutate {
add_field => {
"[event][type]" => [
"denied",
"connection"
]
}
}
mutate {
add_field => {
"[event][action]" => "flow_denied"
}
}
mutate {
add_field => {
"[event][type]" => [
"denied",
"connection"
]
}
}
mutate {
add_field => {
"[event][action]" => "data_match"
}
}
mutate {
add_field => {
"[event][action]" => "file_match"
}
}
mutate {
add_field => {
"[event][action]" => "flood_detected"
}
}
mutate {
add_field => {
"[event][action]" => "packet_attack"
}
}
mutate {
add_field => {
"[event][action]" => "scan_detected"
}
}
mutate {
add_field => {
"[event][action]" => "spyware_detected"
}
}
mutate {
add_field => {
"[event][action]" => "url_filtering"
}
}
mutate {
add_field => {
"[event][action]" => "virus_detected"
}
}
mutate {
add_field => {
"[event][action]" => "exploit_detected"
}
}
mutate {
add_field => {
"[event][action]" => "wildfire_verdict"
}
}
mutate {
add_field => {
"[event][action]" => "wildfire_virus_detected"
}
}
mutate {
add_field => {
"[event][severity]" => 1
}
}
mutate {
add_field => {
"[event][severity]" => 2
}
}
mutate {
add_field => {
"[event][severity]" => 3
}
}
mutate {
add_field => {
"[event][severity]" => 4
}
}
mutate {
add_field => {
"[event][severity]" => 5
}
}
mutate {
add_field => {
"[panw][panos][action]" => "drop-icmp"
}
}
mutate {
add_field => {
"[panw][panos][action]" => "reset-both"
}
}
mutate {
add_field => {
"[panw][panos][action]" => "reset-client"
}
}
mutate {
add_field => {
"[panw][panos][action]" => "reset-server"
}
}
mutate {
add_field => {
"[related][ip]" => [
"{{source.ip}}"
]
}
}
mutate {
add_field => {
"[related][ip]" => [
"{{destination.ip}}"
]
}
}
mutate {
add_field => {
"[related][ip]" => [
"{{source.nat.ip}}"
]
}
}
mutate {
add_field => {
"[related][ip]" => [
"{{destination.nat.ip}}"
]
}
}
geoip {
source => "[source][ip]"
target => "[source][geo]"
}
geoip {
source => "[destination][ip]"
target => "[destination][geo]"
}
geoip {
source => "[source][ip]"
target => "[source][as]"
fields => [
"asn",
"organization_name"
]
}
geoip {
source => "[destination][ip]"
target => "[destination][as]"
fields => [
"asn",
"organization_name"
]
}
mutate {
rename => {
"[source][as][asn]" => "[source][as][number]"
}
}
mutate {
rename => {
"[source][as][organization_name]" => "[source][as][organization][name]"
}
}
mutate {
rename => {
"[destination][as][asn]" => "[destination][as][number]"
}
}
mutate {
rename => {
"[destination][as][organization_name]" => "[destination][as][organization][name]"
}
}
mutate {
rename => {
"[_temp_][srcloc]" => "[source][geo][name]"
}
}
mutate {
rename => {
"[_temp_][dstloc]" => "[destination][geo][name]"
}
}
mutate {
add_field => {
"[network][community_id]" => [
"{{panw.panos.network.nat.community_id}}"
]
}
}
grok {
match => {
"panw.panos.threat.name" => "%{GREEDYDATA:[panw][panos][threat][name]}\(\s*%{GREEDYDATA:[panw][panos][threat][id]}\s*\)"
}
}
mutate {
add_field => {
"[panw][panos][threat][name]" => "URL-filtering"
}
}
mutate {
add_field => {
"[rule][name]" => "{{panw.panos.ruleset}}"
}
}
mutate {
rename => {
"[url][original]" => "[file][name]"
}
}
grok {
match => {
"url.original" => "(%{ANY:[url][scheme]}\:\/\/)?(%{USERNAME:[url][username]}(\:%{PASSWORD:[url][password]})?\@)?%{DOMAIN:[url][domain]}(\:%{POSINT:[url][port]})?(%{PATH:[url][path]})?(\?%{QUERY:[url][query]})?(\#%{ANY:[url][fragment]})?"
}
pattern_definitions => {
"PASSWORD" => "[^@]*"
"DOMAIN" => "[^\/\?#\:]*"
"PATH" => "[^\?#]*"
"QUERY" => "[^#]*"
"ANY" => ".*"
"USERNAME" => "[^\:]*"
}
}
grok {
match => {
"url.path" => "%{FILENAME}((?:\][%{ANY})*(\][%{ANY:[url][extension]}))?"
}
pattern_definitions => {
"FILENAME" => "[^\.]+"
"ANY" => ".*"
}
}
grok {
match => {
"file.name" => "%{FILENAME}((?:\][%{ANY})*(\][%{ANY:[file][extension]}))?"
}
pattern_definitions => {
"ANY" => ".*"
"FILENAME" => "[^\.]+"
}
}
mutate {
add_field => {
"[related][user]" => "{{client.user.name}}"
}
}
mutate {
add_field => {
"[related][user]" => "{{source.user.name}}"
}
}
mutate {
add_field => {
"[related][user]" => "{{server.user.name}}"
}
}
mutate {
add_field => {
"[related][user]" => "{{destination.user.name}}"
}
}
mutate {
add_field => {
"[related][user]" => "{{url.username}}"
}
}
mutate {
add_field => {
"[related][hash]" => "{{panw.panos.file.hash}}"
}
}
mutate {
add_field => {
"[related][hosts]" => "{{observer.hostname}}"
}
}
mutate {
add_field => {
"[related][hosts]" => "{{url.domain}}"
}
}
}