thank a lot for your respond,
I think the bottleneck in Logstash due to everytime I restart Logstash it run normal for a few minutes, then the process slow again...
here is my filter
input {
tcp {
host => "127.0.0.1"
port => 10514
codec => "json"
type => "rsyslog"
tags => ["PAN-OS_syslog"]
tcp_keep_alive => true
}
}
# ***NOTE on PRUNE***
# PA seems to add fields in pacthes and some fields do not apply to most environements
# So the prune command eliminates all undefined fields and unneeded to store only what your envionment uses
# If you want more fields than being stored, add them to the whitelist in the prune statement
# This also eliminates the columnX in the created document
filter {
if "PAN-OS_syslog" in [tags] {
# Log types are "TRAFFIC", "THREAT", "CONFIG", "SYSTEM" and "HIP-MATCH".
# Traffic log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/traffic-log-fields.html
# Converted field names to CIM fields where possible, allows use of CIM Traffic visualitations and reports
if ([message] =~ /TRAFFIC/) {
csv {
source => "message"
columns => [
"receive_time", "serial_number", "log_type", "content_type", "Config Version", "generated_time", "src_ip", "dest_ip",
"src_translated_ip", "dest_translated_ip", "rule", "user", "dest_user", "app", "virtual_system", "src_zone", "dest_zone",
"src_interface", "dest_interface", "log_profile", "Time Logged", "session_id", "repeat_count", "src_port", "dest_port",
"src_translated_port", "dest_translated_port", "tcp_flag", "protocol", "action", "Bytes", "Bytes Sent", "Bytes Received",
"packets", "start", "Elapsed Time", "category", "FUTURE_USE", "seqno", "action_flags", "src_location", "dest_location",
"FUTURE_USE", "packets_out", "packets_in", "session_end_reason", "dg_hier_level_1", "dg_hier_level_2", "dg_hier_level_3",
"dg_hier_level_4", "virtual_system_name", "dvc_name", "action_source", "srv_vm_uuid", "dest_vm_uuid", "tunnelid", "imei",
"parent_session_id", "parent_start_time", "tunnel", "assoc_id", "chunks", "chunks_sent", "chunks_received", "rule_uuid",
"http-2-connection", "link_change_count", "policy_id", "link_switches", "sdwan_cluster", "sdwan_device_type",
"sdwan_cluster_type", "sdwan_site", "dynusergroup_name", "xff_address", "src_device_category", "src_device_profile",
"src_device_model", "src_device_vendor", "src_device_os_family", "src_device_os_version", "src_hostname", "src_mac_address",
"dest_device_category", "dest_device_profile", "dest_device_model", "dest_device_vendor", "dest_device_os_family",
"dest_device_os_version", "dest_hostname", "dest_mac_address", "Container ID", "POD Namespace", "POD Name",
"src_external_dynamic_list", "dest_external_dynamic_list", "host_id", "serial_number", "session_owner",
"high_resolution_timestamp", "session_owner", "High Res Timestamp", "nssai_sst", "nssai_sd", "Subcategory of app",
"Category of app", "Technology of app", "Risk of app", "Characteristic of app", "Container of app", "Tunneled app",
"SaaS of app", "Sanctioned State of app", "offloaded", "flow_type", "cluster_name"
]
}
# Most of the new fields in PANOS 10 are not enabled yet, so they are pruned, will add them to the whitelist as they become available
# But note, default to is store the entire raw message so no data is lost as you can interogate the message field
# Also, some fields like rule_uuid are pruned as rule name is avaiable. If you need the UUID you can see it in the message field
prune {
interpolate => true
whitelist_names => [
"@timestamp","message","receive_time","serial_number","log_type","content_type","generated_time","src_ip","dest_ip","src_translated_ip",
"dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface","dest_interface",
"log_profile","session_id","repeat_count","src_port","dest_port","src_translated_port","dest_translated_port","protocol",
"action","Bytes", "Bytes Sent", "Bytes Received","packets","start","Elapsed Time","category","seqno","action_flags","src_location",
"dest_location","packets_out","packets_in","session_end_reason","dvc_name","action_source","tunnelid","tunnel","Technology of app"
]
}
mutate {
convert => [ "Bytes", "integer" ]
convert => [ "Bytes Sent", "integer" ]
convert => [ "Bytes Received", "integer" ]
convert => [ "Elapsed Time", "integer" ]
convert => [ "geoip.dma_code", "integer" ]
convert => [ "geoip.latitude", "float" ]
convert => [ "geoip.longitude", "float" ]
convert => [ "dest_translated_port", "integer" ]
convert => [ "src_translated_port", "integer" ]
convert => [ "packets", "integer" ]
convert => [ "packets_in", "integer" ]
convert => [ "packets_out", "integer" ]
convert => [ "seqno", "integer" ]
add_tag => [ "PAN-OS_traffic"]
}
}
# Threat log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/threat-log-fields.html
# Converted field names to CIM fields where possible, allows use of CIM Traffic visualitations and reports
else if ([message] =~ /THREAT/) {
csv {
source => "message"
columns => [
"receive_time", "serial_number", "log_type", "Threat/Content Type", "Config Version", "generated_time", "src_ip", "dest_ip",
"src_translated_ip", "dest_translated_ip", "rule", "user", "dest_user", "app", "virtual_system", "src_zone", "dest_zone",
"src_interface", "dest_interface", "log_profile", "Time Logged", "session_id", "repeat_count", "src_port", "dest_port",
"src_translated_port", "dest_translated_port", "tcp_flag", "protocol", "action", "threat_uri_name", "threat_id", "category",
"severity", "direction", "seqno", "action_flags", "src_location", "dest_location", "FUTURE_USE", "contenttype", "pcap_id",
"filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient", "reportid",
"dg_hier_level_1", "dg_hier_level_2", "dg_hier_level_3", "dg_hier_level_4", "virtual_system_name", "Device Name", "file_url",
"srv_vm_uuid", "dest_vm_uuid", "http_method", "tunnelid", "imei", "parent_session_id", "parent_start_time", "tunnel",
"thr_catagory", "contentver", "sig_flags", "assoc_id", "ppid", "http_headers", "url_catagory_list", "rule_uuid",
"http-2-connection", "dynusergroup_name", "XFF address", "Source Device Category", "Source Device Profile",
"Source Device Model", "Source Device Vendor", "Source Device OS Family", "Source Device OS Version", "Source Hostname",
"Source Mac Address", "Destination Device Category", "Destination Device Profile", "Destination Device Model",
"Destination Device Vendor", "Destination Device OS Family", "Destination Device OS Version", "Destination Hostname",
"Destination Mac Address", "Container ID", "POD Namespace", "POD Name", "Source External Dynamic List",
"Destination External Dynamic List", "Host ID", "Serial Number", "domain_edl", "Source Dynamic Address Group",
"Destination Dynamic Address Group", "partial_hash", "High Res Timestamp", "Reason", "justification", "nssai_sst",
"Subcategory of app", "Category of app", "Technology of app", "Risk of app", "Characteristic of app", "Container of app",
"Tunneled app", "SaaS of app", "Sanctioned State of app", "Cloud Report ID", "cluster_name", "flow_type"
]
}
prune {
interpolate => true
whitelist_names => [
"@timestamp", "message", "receive_time", "serial_number", "log_type", "Threat/Content Type", "generated_time", "src_ip",
"dest_ip", "src_translated_ip", "dest_translated_ip", "rule", "user", "dest_user", "app", "virtual_system", "src_zone",
"dest_zone", "src_interface", "dest_interface", "log_profile", "session_id", "repeat_count", "src_port", "dest_port",
"src_translated_port", "dest_translated_port", "tcp_flag", "protocol", "action", "threat_uri_name", "threat_id", "category",
"severity", "direction", "seqno", "action_flags", "src_location", "dest_location", "contenttype", "pcap_id", "filedigest",
"cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient", "reportid", "Source Hostname",
"http_method", "tunnelid", "tunnel", "thr_catagory", "contentver", "assoc_id", "ppid", "http_headers", "url_catagory_list",
"rule_uuid", "Device Name", "Risk of app"
]
}
mutate {
convert => [ "geoip.dma_code", "integer" ]
convert => [ "geoip.latitude", "float" ]
convert => [ "geoip.longitude", "float" ]
convert => [ "dest_translated_port", "integer" ]
convert => [ "src_translated_port", "integer" ]
convert => [ "SequenceNumber", "integer" ]
add_tag => ["PAN-OS_threat"]
}
}
#filter GlobalProtect - VPN client
else if ([message] =~ /GLOBALPROTECT/) {
csv {
source => "message"
columns => [
"receive_time", "Serial Number", "Type", "Threat/Content Type", "Config Version", "Generate Time", "Virtual System",
"Event ID", "Stage", "Authentication Method", "Tunnel Type", "Source User", "Source Region", "Machine Name",
"Public IP", "Public IPv6", "Private IP", "Private IPv6", "Host ID", "Serial_Number", "Client Version", "Client OS",
"Client OS Version", "Repeat Count", "Reason", "Error", "Description", "Status", "Location", "Login Duration",
"Connect Method", "Error Code", "Portal", "Sequence Number", "Action Flags", "High Res Timestamp", "Selection Type",
"Response Time", "Priority", "Attempted Gateways", "Gateway", "DG Hierarchy Level 1", "DG Hierarchy Level 2",
"DG Hierarchy Level 3", "DG Hierarchy Level 4", "Virtual System Name", "Device Name", "Virtual System ID",
"cluster_name"
]
}
prune {
interpolate => true
whitelist_names => [
"@timestamp","message","receive_time","Serial Number","Type","Threat/Content Type","Config Version","Generate Time",
"Virtual System","Event ID","Stage","Authentication Method","Tunnel Type","Source User","Source Region","Machine Name",
"Public IP","Private IP","Serial Number","Client Version","Client OS","Client OS Version","Repeat Count","Reason","Error",
"Status","Location","Login Duration","Connect Method","Error Code","Portal","High Res Timestamp"
]
}
mutate {
convert => [ "Serial Number", "integer" ]
convert => [ "Login Duration", "integer" ]
add_tag => ["PAN-OS_GLOBALPROTECT"]
}
}
# Config log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/config-log-fields.html
else if ([message] =~ /CONFIG/) {
csv {
source => "message"
columns => [
"FUTURE_USE", "receive_time", "serial_number", "Type", "Threat/Content Type", "Config Version", "generated_time", "src_ip",
"virtual_system", "Command", "Admin", "Client", "Result", "Configuration_Path", "Sequence Number", "before_change_detail",
"after_change_detail", "action_flags", "DG Hierarchy Level 1", "DG Hierarchy Level 2", "DG Hierarchy Level 3",
"Virtual System Name", "DG Hierarchy Level 4", "dvc_name", "dg_id", "comment"
]
}
prune {
interpolate => true
whitelist_names => [
"@timestamp","message","receive_time","serial_number","Type","Threat/Content Type","generated_time","src_ip","virtual_system",
"Command","Admin","Client","Result","Configuration_Path","before_change_detail","after_change_detail","Sequence Number","action_flags","dvc_name"
]
}
mutate {
add_tag => ["PAN-OS_Config"]
}
}
else if ([message] =~ /SYSTEM/) {
csv {
source => "message"
columns => [
"receive_time", "Serial Number", "Type", "Threat/Content Type", "Config Version", "Generate Time", "Virtual System",
"Event ID", "Object", "fmt", "id", "module", "Severity", "Description", "Sequence Number", "Action Flags",
"DG Hierarchy Level 1", "DG Hierarchy Level 2", "DG Hierarchy Level 3", "DG Hierarchy Level 4", "Virtual System Name",
"Device Name", "dg_id", "tpl_id", "High Res Timestamp"
]
}
prune {
interpolate => true
whitelist_names => [
"@timestamp", "message", "receive_time", "Serial Number", "Type", "Threat/Content Type", "Generated Time", "Virtual System", "Event ID",
"Object", "module", "Severity", "Description", "Sequence Number", "Action Flags", "Device Name"
]
}
if ([Description] =~ /DHCP lease started/) {
grok {
match => {"Description" => "%{IP:lease_ip} --> mac %{MAC:mac_address} - hostname %{WORD:src_name}" }
add_tag => [ "DHCP_LEASE"]
}
}
mutate {
add_tag => ["PAN-OS_System"]
}
}
# If your firewall is subject to any audits or PCI compliance, then you must leave the orginal RAW message in the logs
# so leave this section commented out. If not, you can uncomment this mutate section as you do not need the unparsed
# raw log to be in each document.
#
# mutate {
# # Original message has been fully parsed, so remove it.
# remove_field => ["message"]
# }
# Geolocate logs that have src_ip and it is a non-RFC1918 address
if [src_ip] and [src_ip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
source => "src_ip"
target => "src_geoip"
}
# Delete 0,0 in src_geo.location if equal to 0,0
if ([src_geoip.location] and [src_geoip.location] =~ "0,0") {
mutate {
replace => [ "src_geoip.location", "" ]
}
}
}
# Geolocate logs that have dest_ip and if that dest_ip is a non-RFC1918 address
if [dest_ip] and [dest_ip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
source => "dest_ip"
target => "dest_geoip"
}
# Delete 0,0 in dest_geoip.location if equal to 0,0
if ([dest_geoip.location] and [dest_geoip.location] =~ "0,0") {
mutate {
replace => [ "dest_geoip.location", "" ]
}
}
}
# Takes the 5-tuple of source address, source port, destination address, destination port, and protocol and does a SHA1 hash to fingerprint the flow. This is a useful
# way to be able to do top N terms queries on flows, not just on one field.
if [src_ip] and [dest_ip] {
fingerprint {
concatenate_sources => true
method => "SHA1"
key => "logstash"
source => ["src_ip", "src_port", "dest_ip", "dest_port", "protocol" ]
}
}
#Resolve IP address to names. The DNS filter can be a performance issue is receing over 400 requstes per second.
#Do not use Logstash DNS Filter Cache parameters, it forces a single thread process regaurdless of how many workers you have
#For best performance, install dnsmasq on the logstash server
#Make sure your dnsmasq points to an internal DNS to resolve local RFC1918 addresses
if [src_ip] {
mutate {
add_field => { "src_name" => "%{src_ip}" }
}
dns {
reverse => ["src_name"]
action => "replace"
nameserver => [ "10.164.10.115" ]
}
}
if [dest_ip] {
mutate {
add_field => { "dest_name" => "%{dest_ip}" }
}
dns {
reverse => ["dest_name"]
action => "replace"
nameserver => ["10.164.10.116"]
}
}
}
}
# Every single log will be forwarded to ElasticSearch. If you are using another port, you should specify it here.
output {
if "PAN-OS_traffic" in [tags] {
elasticsearch {
index => "panos-traffic"
hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
}
}
else if "PAN-OS_threat" in [tags] {
elasticsearch {
index => "panos-threat"
hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
}
}
else if "PAN-OS_Config" in [tags] {
elasticsearch {
index => "panos-config"
hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
}
}
else if "PAN-OS_GLOBALPROTECT" in [tags] {
elasticsearch {
index => "panos-global-protect"
hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
}
}
else if "PAN-OS_System" in [tags] {
elasticsearch {
index => "panos-system"
hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
}
}
else {
elasticsearch {
index => "panos-undefined"
hosts => ["10.164.254.70:9200", "10.164.254.71:9200"]
}
}
}
thanks you