Hello,
I am using Logstash to process PAN-OS syslog data and I'm facing an issue with the file name timestamp. I want the output file names to be in the format YYYY-MM-DD with the time zone set to "Europe/Berlin" (Germany time zone).
Despite configuring the Europe/Berlin timezone, Logstash continues to use UTC time in the output file names instead of the correct timezone.
Here is the relevant part of my Logstash configuration:
Logstash Input Configuration:
input {
syslog {
timezone => "Europe/Berlin"
port => "5514"
type => "syslog"
tags => [ "PAN-OS_syslog" ]
}
}
Logstash Output Configuration:
output {
if "PAN-OS_traffic" in [tags] {
file {
path => "/var/log/paloalto_5651/panos-traffic-%{+YYYY-MM-dd}.log"
codec => json_lines
}
}
...
}
Issue:
My goal is to have the file name like panos-traffic-2024-12-08.log with the date in the "Europe/Berlin" time zone (UTC+1, or UTC+2 during daylight saving time). However, even after configuring Europe/Berlin, Logstash still uses UTC for the file name.
I have tried various configurations for getting the timestamp in the file name:
Using a custom timestamp field like timestamp_berlin.
Experimenting with different syntax for the placeholder.
Despite these attempts, Logstash still uses UTC for the file name timestamp.
Additionally, I have set the system time correctly and enabled NTP to synchronize the system clock with an external time source, ensuring that the system time is accurate. However, this does not seem to affect Logstash's behavior in using UTC time.
I would appreciate any suggestions or solutions to help me correctly apply the Germany time zone in the output file names.
Thank you in advance!
pan-os.conf
input {
syslog {
timezone => "Europe/Berlin"
port => "5514"
type => "syslog"
tags => [ "PAN-OS_syslog" ]
}
}
# ***NOTE on PRUNE***
# PA seems to add fields in pacthes and some fields do not apply to most environements
# So the prune command eliminates all undefined fields and unneeded to store only what your envionment uses
# If you want more fields than being stored, add them to the whitelist in the prune statement
# This also eliminates the columnX in the created document
filter {
if "PAN-OS_syslog" in [tags] {
# Log types are "TRAFFIC", "THREAT", "CONFIG", "SYSTEM" and "HIP-MATCH".
# Traffic log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/traffic-log-fields.html
#converted field names to CIM fields where possible, allows use of CIM Traffic visualitations and reports
if ([message] =~ /TRAFFIC/) {
csv {
source => "message"
columns => [
"FUTURE_USE","receive_time","serial_number","log_type","content_type","FUTURE_USE","generated_time","src_ip","dest_ip","src_translated_ip",
"dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface","dest_interface","log_profile",
"FUTURE_USE","session_id","repeat_count","src_port","dest_port","src_translated_port","dest_translated_port","tcp_flag","protocol","action",
"bytes","bytes_out","bytes_in","packets","start","elapsed_sec","category","FUTURE_USE","seqno","action_flags","src_location",
"dest_location","FUTURE_USE","packets_out","packets_in","session_end_reason","dg_hier_level_1","dg_hier_level_2","dg_hier_level_3",
"dg_hier_level_4","virtual_system_name","dvc_name","action_source","srv_vm_uuid","dest_vm_uuid","tunnelid","imei","parent_session_id"
,"parent_start_time","tunnel","assoc_id","chunks","chunks_sent","chunks_received","rule_uuid","http-2-connection","link_change_count",
"policy_id","link_switches","sdwan_cluster","sdwan_device_type","sdwan_site","dynusergroup_name","xff_address","src_device_category",
"src_device_profile","src_device_model","src_device_vendor","src_device_os_family","src_device_os_version","src_hostname",
"src_mac_address","dest_device_category","dest_device_profile","dest_device_model","dest_device_vendor","dest_device_os_family",
"dest_device_os_version","dest_hostname","dest_mac_address","src_external_dynamic_list","dest_external_dynamic_list","host_id",
"serial_number","session_owner","high_resolution_timestamp"
]
}
# Most of the new fields in PANOS 10 are not enabled yet, so they are pruned, will add them to the whitelist as they become available
# But note, default to is store the entire raw message so no data is lost as you can interogate the message field
# Also, some fields like rule_uuid are pruned as rule name is avaiable. If you need the UUID you can see it in the message field
prune {
interpolate => true
whitelist_names => [
"@timestamp","message","receive_time","serial_number","log_type","content_type","generated_time","src_ip","dest_ip","src_translated_ip",
"dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface","dest_interface",
"log_profile","session_id","repeat_count","src_port","dest_port","src_translated_port","dest_translated_port","protocol",
"action","bytes","bytes_out","bytes_in","packets","start","elapsed_sec","category","seqno","action_flags","src_location",
"dest_location","packets_out","packets_in","session_end_reason","dvc_name","action_source","tunnelid","tunnel","http-2-connection"
]
}
mutate {
convert => [ "bytes", "integer" ]
convert => [ "bytes_in", "integer" ]
convert => [ "bytes_out", "integer" ]
convert => [ "elapsed_sec", "integer" ]
convert => [ "geoip.dma_code", "integer" ]
convert => [ "geoip.latitude", "float" ]
convert => [ "geoip.longitude", "float" ]
convert => [ "dest_translated_port", "integer" ]
convert => [ "src_translated_port", "integer" ]
convert => [ "packets", "integer" ]
convert => [ "packets_in", "integer" ]
convert => [ "packets_out", "integer" ]
convert => [ "seqno", "integer" ]
add_tag => [ "PAN-OS_traffic"]
}
}
# Threat log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/threat-log-fields.html
#converted field names to CIM fields where possible, allows use of CIM Traffic visualitations and reports
else if ([message] =~ /THREAT/) {
csv {
source => "message"
columns => [
"FUTURE_USE-1","receive_time","serial_number","log_type","content_type","FUTURE_USE-2","generated_time","src_ip","dest_ip",
"src_translated_ip","dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface",
"dest_interface","log_profile","FUTURE_USE-3","session_id","repeat_count","src_port","dest_port","src_translated_port",
"dest_translated_port","tcp_flag","protocol","action","threat_uri_name","threat_id","category","severity","direction","seqno","action_flags",
"src_location","dest_location","FUTURE_USE-4","contenttype","pcap_id","filedigest","cloud","url_idx","user_agent","filetype","xff","referer",
"sender","subject","recipient","reportid","dg_hier_level_1","dg_hier_level_2","dg_hier_level_3","dg_hier_level_4","virtual_system_name",
"dvc_name","FUTURE_USE-5","srv_vm_uuid","dest_vm_uuid","http_method","tunnelid","imei","parent_session_id","parent_start_time","tunnel",
"thr_catagory","contentver","FUTURE_USE-6","assoc_id","ppid","http_headers","url_catagory_list","rule_uuid","http-2-connection","dynusergroup_name"
]
}
prune {
interpolate => true
whitelist_names => [
"@timestamp","message","receive_time","serial_number","log_type","content_type","generated_time","src_ip","dest_ip",
"src_translated_ip","dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface",
"dest_interface","log_profile","session_id","repeat_count","src_port","dest_port","src_translated_port",
"dest_translated_port","tcp_flag","protocol","action","threat_uri_name","threat_id","category","severity","direction","seqno","action_flags",
"src_location","dest_location","contenttype","pcap_id","filedigest","cloud","url_idx","user_agent","filetype","xff","referer",
"sender","subject","recipient","reportid",
"dvc_name","http_method","tunnelid","tunnel",
"thr_catagory","contentver","assoc_id","ppid","http_headers","url_catagory_list","rule_uuid","http-2-connection","dynusergroup_name"
]
}
mutate {
convert => [ "geoip.dma_code", "integer" ]
convert => [ "geoip.latitude", "float" ]
convert => [ "geoip.longitude", "float" ]
convert => [ "dest_translated_port", "integer" ]
convert => [ "src_translated_port", "integer" ]
convert => [ "SequenceNumber", "integer" ]
add_tag => ["PAN-OS_threat"]
}
}
# Config log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/config-log-fields.html
else if ([message] =~ /CONFIG/) {
csv {
source => "message"
columns => [
"FUTURE_USE","receive_time","serial_number","log_type","Subtype","FUTURE_USE","generated_time","src_ip","virtual_system",
"Command","Admin","Client","Result","Configuration Path","Before Change Detail","After Change Detail","seqno","action_flags",
"dg_hier_level_1","dg_hier_level_2","dg_hier_level_3","dg_hier_level_4","virtual_system_name","dvc_name"
]
}
prune {
interpolate => true
whitelist_names => [
"@timestamp","message","receive_time","serial_number","log_type","Subtype","generated_time","src_ip","virtual_system",
"Command","Admin","Client","Result","Configuration Path","Before Change Detail","After Change Detail","seqno","action_flags","dvc_name"
]
}
mutate {
add_tag => [ "PAN-OS_Config"]
}
}
else if ([message] =~ /SYSTEM/) {
csv {
source => "message"
columns => [
"FUTURE_USE","receive_time","serial_number","log_type","Subtype","FUTURE_USE","generated_time","virtual_system","event_id",
"Object","FUTURE_USE","FUTURE_USE","Module","severity","Description","seqno","action_flags","dg_hier_level_1",
"dg_hier_level_2","dg_hier_level_3","dg_hier_level_4","virtual_system_name","dvc_name"
]
}
prune {
interpolate => true
whitelist_names => [
"@timestamp","message","receive_time","serial_number","log_type","Subtype","generated_time","virtual_system","event_id",
"Object","Module","severity","Description","seqno","action_flags","dvc_name"
]
}
if ([Description] =~ /DHCP lease started/) {
grok {
match => {"Description" => "%{IP:lease_ip} --> mac %{MAC:mac_address} - hostname %{WORD:src_name}" }
add_tag => [ "DHCP_LEASE"]
}
}
mutate {
add_tag => [ "PAN-OS_System"]
}
}
# If your firewall is subject to any audits or PCI compliance, then you must leave the orginal RAW message in the logs
# so leave this section commented out. If not, you can uncomment this mutate section as you do not need the unparsed
# raw log to be in each document.
#
# mutate {
# # Original message has been fully parsed, so remove it.
# remove_field => [ "message" ]
# }
# Geolocate logs that have src_ip and it is a non-RFC1918 address
if [src_ip] and [src_ip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
source => "src_ip"
target => "src_geoip"
}
# Delete 0,0 in src_geo.location if equal to 0,0
if ([src_geoip.location] and [src_geoip.location] =~ "0,0") {
mutate {
replace => [ "src_geoip.location", "" ]
}
}
}
# Geolocate logs that have dest_ip and if that dest_ip is a non-RFC1918 address
if [dest_ip] and [dest_ip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
source => "dest_ip"
target => "dest_geoip"
}
# Delete 0,0 in dest_geoip.location if equal to 0,0
if ([dest_geoip.location] and [dest_geoip.location] =~ "0,0") {
mutate {
replace => [ "dest_geoip.location", "" ]
}
}
}
# Takes the 5-tuple of source address, source port, destination address, destination port, and protocol and does a SHA1 hash to fingerprint the flow. This is a useful
# way to be able to do top N terms queries on flows, not just on one field.
if [src_ip] and [dest_ip] {
fingerprint {
concatenate_sources => true
method => "SHA1"
key => "logstash"
source => [ "src_ip", "src_port", "dest_ip", "dest_port", "protocol" ]
}
}
#Resolve IP address to names. The DNS filter can be a performance issue is receing over 400 requstes per second.
#Do not use Logstash DNS Filter Cache parameters, it forces a single thread process regaurdless of how many workers you have
#For best performance, install dnsmasq on the logstash server
#Make sure your dnsmasq points to an internal DNS to resolve local RFC1918 addresses
if [src_ip] {
mutate {
add_field => { "src_name" => "%{src_ip}" }
}
dns {
reverse => [ "src_name"]
action => "replace"
nameserver => [ "127.0.0.1" ]
}
}
if [dest_ip] {
mutate {
add_field => { "dest_name" => "%{dest_ip}" }
}
dns {
reverse => [ "dest_name"]
action => "replace"
nameserver => [ "127.0.0.1" ]
}
}
}
}
output {
if "PAN-OS_traffic" in [tags] {
file {
path => "/var/log/paloalto_5651/panos-traffic-%{+YYYY-MM-dd}.log"
codec => json_lines
}
elasticsearch {
index => "panos-traffic"
hosts => ["localhost:9200"]
user => "elastic"
password => "password"
}
}
else if "PAN-OS_threat" in [tags] {
file {
path => "/var/log/paloalto_5651/panos-threat-%{+YYYY-MM-dd}.log"
codec => json_lines
}
elasticsearch {
index => "panos-threat"
hosts => ["localhost:9200"]
user => "elastic"
password => "password"
}
}
else if "PAN-OS_Config" in [tags] {
file {
path => "/var/log/paloalto_5651/panos-config-%{+YYYY-MM-dd}.log"
codec => json_lines
}
elasticsearch {
index => "panos-config"
hosts => ["localhost:9200"]
user => "elastic"
password => "password"
}
}
else if "PAN-OS_System" in [tags] {
file {
path => "/var/log/paloalto_5651/panos-system-%{+YYYY-MM-dd}.log"
codec => json_lines
}
elasticsearch {
index => "panos-system"
hosts => ["localhost:9200"]
user => "elastic"
password => "password"
}
}
else {
file {
path => "/var/log/paloalto_5651/panos-undefined-%{+YYYY-MM-dd}.log"
codec => json_lines
}
elasticsearch {
index => "panos-undefined"
hosts => ["localhost:9200"]
user => "elastic"
password => "password"
}
}
}