Logstash File Naming Issue with Timezone (Attempting with Other Timezone)

Hello,

I am using Logstash to process PAN-OS syslog data and I'm facing an issue with the file name timestamp. I want the output file names to be in the format YYYY-MM-DD with the time zone set to "Europe/Berlin" (Germany time zone).

Despite configuring the Europe/Berlin timezone, Logstash continues to use UTC time in the output file names instead of the correct timezone.

Here is the relevant part of my Logstash configuration:

Logstash Input Configuration:

input {
    syslog {
        timezone => "Europe/Berlin"
        port => "5514"
        type => "syslog"
        tags => [ "PAN-OS_syslog" ]
    }
}

Logstash Output Configuration:

output {
    if "PAN-OS_traffic" in [tags] {
        file {
            path => "/var/log/paloalto_5651/panos-traffic-%{+YYYY-MM-dd}.log"
            codec => json_lines
        }
    }
    ...
}

Issue:
My goal is to have the file name like panos-traffic-2024-12-08.log with the date in the "Europe/Berlin" time zone (UTC+1, or UTC+2 during daylight saving time). However, even after configuring Europe/Berlin, Logstash still uses UTC for the file name.

I have tried various configurations for getting the timestamp in the file name:

Using a custom timestamp field like timestamp_berlin.
Experimenting with different syntax for the placeholder.
Despite these attempts, Logstash still uses UTC for the file name timestamp.

Additionally, I have set the system time correctly and enabled NTP to synchronize the system clock with an external time source, ensuring that the system time is accurate. However, this does not seem to affect Logstash's behavior in using UTC time.

I would appreciate any suggestions or solutions to help me correctly apply the Germany time zone in the output file names.

Thank you in advance!

pan-os.conf


input {
    syslog {
        timezone => "Europe/Berlin"
        port => "5514"
        type => "syslog"
        tags => [ "PAN-OS_syslog" ]
    }
}


                # ***NOTE on PRUNE***
                #  PA seems to add fields in pacthes and some fields do not apply to most environements
                #  So the prune command eliminates all undefined fields and unneeded to store only what your envionment uses
                #  If you want more fields than being stored, add them to the whitelist in the prune statement
                #  This also eliminates the columnX in the created document

filter {
    if "PAN-OS_syslog" in [tags] {

        # Log types are "TRAFFIC", "THREAT", "CONFIG", "SYSTEM" and "HIP-MATCH".

        # Traffic log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/traffic-log-fields.html
        #converted field names to CIM fields where possible, allows use of CIM Traffic visualitations and reports

        if ([message] =~ /TRAFFIC/) {
            csv {
                source => "message"
                columns => [
                "FUTURE_USE","receive_time","serial_number","log_type","content_type","FUTURE_USE","generated_time","src_ip","dest_ip","src_translated_ip",
                                "dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface","dest_interface","log_profile",
                                "FUTURE_USE","session_id","repeat_count","src_port","dest_port","src_translated_port","dest_translated_port","tcp_flag","protocol","action",
                                "bytes","bytes_out","bytes_in","packets","start","elapsed_sec","category","FUTURE_USE","seqno","action_flags","src_location",
                                "dest_location","FUTURE_USE","packets_out","packets_in","session_end_reason","dg_hier_level_1","dg_hier_level_2","dg_hier_level_3",
                                "dg_hier_level_4","virtual_system_name","dvc_name","action_source","srv_vm_uuid","dest_vm_uuid","tunnelid","imei","parent_session_id"
                                ,"parent_start_time","tunnel","assoc_id","chunks","chunks_sent","chunks_received","rule_uuid","http-2-connection","link_change_count",
                                "policy_id","link_switches","sdwan_cluster","sdwan_device_type","sdwan_site","dynusergroup_name","xff_address","src_device_category",
                                "src_device_profile","src_device_model","src_device_vendor","src_device_os_family","src_device_os_version","src_hostname",
                                "src_mac_address","dest_device_category","dest_device_profile","dest_device_model","dest_device_vendor","dest_device_os_family",
                                "dest_device_os_version","dest_hostname","dest_mac_address","src_external_dynamic_list","dest_external_dynamic_list","host_id",
                                "serial_number","session_owner","high_resolution_timestamp"
                ]
            }
                        # Most of the new fields in PANOS 10 are not enabled yet, so they are pruned, will add them to the whitelist as they become available
                        # But note, default to is store the entire raw message so no data is lost as you can interogate the message field
                        # Also, some fields like rule_uuid are pruned as rule name is avaiable.  If you need the UUID you can see it in the message field
            prune {
                        interpolate => true
                        whitelist_names => [
                                 "@timestamp","message","receive_time","serial_number","log_type","content_type","generated_time","src_ip","dest_ip","src_translated_ip",
                                 "dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface","dest_interface",
                                 "log_profile","session_id","repeat_count","src_port","dest_port","src_translated_port","dest_translated_port","protocol",
                                 "action","bytes","bytes_out","bytes_in","packets","start","elapsed_sec","category","seqno","action_flags","src_location",
                                 "dest_location","packets_out","packets_in","session_end_reason","dvc_name","action_source","tunnelid","tunnel","http-2-connection"
                                    ]
                    }

            mutate {
                convert => [ "bytes", "integer" ]
                convert => [ "bytes_in", "integer" ]
                convert => [ "bytes_out", "integer" ]
                convert => [ "elapsed_sec", "integer" ]
                convert => [ "geoip.dma_code", "integer" ]
                convert => [ "geoip.latitude", "float" ]
                convert => [ "geoip.longitude", "float" ]
                convert => [ "dest_translated_port", "integer" ]
                convert => [ "src_translated_port", "integer" ]
                convert => [ "packets", "integer" ]
                convert => [ "packets_in", "integer" ]
                convert => [ "packets_out", "integer" ]
                convert => [ "seqno", "integer" ]

                add_tag => [ "PAN-OS_traffic"]
            }


        }


        # Threat log fields: https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/threat-log-fields.html
        #converted field names to CIM fields where possible, allows use of CIM Traffic visualitations and reports
        else if ([message] =~ /THREAT/) {
            csv {
                source => "message"
                columns => [
                  "FUTURE_USE-1","receive_time","serial_number","log_type","content_type","FUTURE_USE-2","generated_time","src_ip","dest_ip",
                  "src_translated_ip","dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface",
                  "dest_interface","log_profile","FUTURE_USE-3","session_id","repeat_count","src_port","dest_port","src_translated_port",
                  "dest_translated_port","tcp_flag","protocol","action","threat_uri_name","threat_id","category","severity","direction","seqno","action_flags",
                  "src_location","dest_location","FUTURE_USE-4","contenttype","pcap_id","filedigest","cloud","url_idx","user_agent","filetype","xff","referer",
                  "sender","subject","recipient","reportid","dg_hier_level_1","dg_hier_level_2","dg_hier_level_3","dg_hier_level_4","virtual_system_name",
                  "dvc_name","FUTURE_USE-5","srv_vm_uuid","dest_vm_uuid","http_method","tunnelid","imei","parent_session_id","parent_start_time","tunnel",
                  "thr_catagory","contentver","FUTURE_USE-6","assoc_id","ppid","http_headers","url_catagory_list","rule_uuid","http-2-connection","dynusergroup_name"

                ]
            }

             prune {
                        interpolate => true
                        whitelist_names => [
                  "@timestamp","message","receive_time","serial_number","log_type","content_type","generated_time","src_ip","dest_ip",
                  "src_translated_ip","dest_translated_ip","rule","user","dest_user","app","virtual_system","src_zone","dest_zone","src_interface",
                  "dest_interface","log_profile","session_id","repeat_count","src_port","dest_port","src_translated_port",
                  "dest_translated_port","tcp_flag","protocol","action","threat_uri_name","threat_id","category","severity","direction","seqno","action_flags",
                  "src_location","dest_location","contenttype","pcap_id","filedigest","cloud","url_idx","user_agent","filetype","xff","referer",
                  "sender","subject","recipient","reportid",
                  "dvc_name","http_method","tunnelid","tunnel",
                  "thr_catagory","contentver","assoc_id","ppid","http_headers","url_catagory_list","rule_uuid","http-2-connection","dynusergroup_name"

                                    ]
                    }


            mutate {
                convert => [ "geoip.dma_code", "integer" ]
                convert => [ "geoip.latitude", "float" ]
                convert => [ "geoip.longitude", "float" ]
                convert => [ "dest_translated_port", "integer" ]
                convert => [ "src_translated_port", "integer" ]
                convert => [ "SequenceNumber", "integer" ]

                add_tag => ["PAN-OS_threat"]
            }

        }

    # Config log fields:  https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/config-log-fields.html
        else if ([message] =~ /CONFIG/) {
            csv {
                source => "message"
                columns => [
                     "FUTURE_USE","receive_time","serial_number","log_type","Subtype","FUTURE_USE","generated_time","src_ip","virtual_system",
                     "Command","Admin","Client","Result","Configuration Path","Before Change Detail","After Change Detail","seqno","action_flags",
                     "dg_hier_level_1","dg_hier_level_2","dg_hier_level_3","dg_hier_level_4","virtual_system_name","dvc_name"

                ]
            }
             prune {
                        interpolate => true
                        whitelist_names => [
                  "@timestamp","message","receive_time","serial_number","log_type","Subtype","generated_time","src_ip","virtual_system",
                     "Command","Admin","Client","Result","Configuration Path","Before Change Detail","After Change Detail","seqno","action_flags","dvc_name"

                                    ]
                    }


            mutate {
                add_tag => [ "PAN-OS_Config"]
            }
        }

        else if ([message] =~ /SYSTEM/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE","receive_time","serial_number","log_type","Subtype","FUTURE_USE","generated_time","virtual_system","event_id",
                    "Object","FUTURE_USE","FUTURE_USE","Module","severity","Description","seqno","action_flags","dg_hier_level_1",
                    "dg_hier_level_2","dg_hier_level_3","dg_hier_level_4","virtual_system_name","dvc_name"

                ]
            }
             prune {
                        interpolate => true
                        whitelist_names => [
                  "@timestamp","message","receive_time","serial_number","log_type","Subtype","generated_time","virtual_system","event_id",
                    "Object","Module","severity","Description","seqno","action_flags","dvc_name"
                                    ]
                    }
              if ([Description] =~ /DHCP lease started/) {
               grok {
                   match => {"Description" => "%{IP:lease_ip} --> mac %{MAC:mac_address} - hostname %{WORD:src_name}" }
                               add_tag => [ "DHCP_LEASE"]
                }
                           }


            mutate {
                add_tag => [ "PAN-OS_System"]
            }
        }


  #  If your firewall is subject to any audits or PCI compliance, then you must leave the orginal RAW message in the logs
  #  so leave this section commented out.  If not, you can uncomment this mutate section as you do not need the unparsed
  #  raw log to be in each document.
  #
  #      mutate {
  #          # Original message has been fully parsed, so remove it.
  #          remove_field => [ "message" ]
  #      }

        # Geolocate logs that have src_ip and it is a non-RFC1918 address
        if [src_ip] and [src_ip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
               source => "src_ip"
               target => "src_geoip"
          }

            # Delete 0,0 in src_geo.location if equal to 0,0
            if ([src_geoip.location] and [src_geoip.location] =~ "0,0") {
                mutate {
                    replace => [ "src_geoip.location", "" ]
                }
            }
        }

        # Geolocate logs that have dest_ip and if that dest_ip is a non-RFC1918 address
        if [dest_ip] and [dest_ip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
                source => "dest_ip"
                target => "dest_geoip"
            }

            # Delete 0,0 in dest_geoip.location if equal to 0,0
            if ([dest_geoip.location] and [dest_geoip.location] =~ "0,0") {
                mutate {
                    replace => [ "dest_geoip.location", "" ]
                }
            }
        }

        # Takes the 5-tuple of source address, source port, destination address, destination port, and protocol and does a SHA1 hash to fingerprint the flow.  This is a useful
        # way to be able to do top N terms queries on flows, not just on one field.
        if [src_ip] and [dest_ip] {
            fingerprint {
                concatenate_sources => true
                method => "SHA1"
                key => "logstash"
                source => [ "src_ip", "src_port", "dest_ip", "dest_port", "protocol" ]
            }
        }

        #Resolve IP address to names.  The DNS filter can be a performance issue is receing over 400 requstes per second.
        #Do not use Logstash DNS Filter Cache parameters, it forces a single thread process regaurdless of how many workers you have
        #For best performance, install dnsmasq on the logstash server
        #Make sure your dnsmasq points to an internal DNS to resolve local RFC1918 addresses
        if [src_ip] {
            mutate {
            add_field => { "src_name" => "%{src_ip}" }
            }
            dns {
                reverse => [ "src_name"]
                action => "replace"
               nameserver => [ "127.0.0.1" ]

            }
       }
       if [dest_ip] {
            mutate {
            add_field => { "dest_name" => "%{dest_ip}" }
            }
            dns {
                reverse => [ "dest_name"]
                action => "replace"
                nameserver => [ "127.0.0.1" ]

            }
       }

    }
}

output {
    if "PAN-OS_traffic" in [tags] {
        file {
            path => "/var/log/paloalto_5651/panos-traffic-%{+YYYY-MM-dd}.log"
            codec => json_lines
        }
        elasticsearch {
            index => "panos-traffic"
            hosts => ["localhost:9200"]
            user => "elastic"
            password => "password"
        }
    }

    else if "PAN-OS_threat" in [tags] {
        file {
            path => "/var/log/paloalto_5651/panos-threat-%{+YYYY-MM-dd}.log"
            codec => json_lines
        }
        elasticsearch {
            index => "panos-threat"
            hosts => ["localhost:9200"]
            user => "elastic"
            password => "password"
        }
    }

    else if "PAN-OS_Config" in [tags] {
        file {
            path => "/var/log/paloalto_5651/panos-config-%{+YYYY-MM-dd}.log"
            codec => json_lines
        }
        elasticsearch {
            index => "panos-config"
            hosts => ["localhost:9200"]
            user => "elastic"
            password => "password"
        }
    }

    else if "PAN-OS_System" in [tags] {
        file {
            path => "/var/log/paloalto_5651/panos-system-%{+YYYY-MM-dd}.log"
            codec => json_lines
        }
        elasticsearch {
            index => "panos-system"
            hosts => ["localhost:9200"]
            user => "elastic"
            password => "password"
        }
    }

    else {
        file {
            path => "/var/log/paloalto_5651/panos-undefined-%{+YYYY-MM-dd}.log"
            codec => json_lines
        }
        elasticsearch {
            index => "panos-undefined"
            hosts => ["localhost:9200"]
            user => "elastic"
            password => "password"
        }
    }
}

Hello and welcome,

This happens because all date fields in Logstash will be in UTC and this cannot be changed, the timezone option that you have in some filters is used when a date string does not have timezone information and it is not in UTC, so logstash can use the correct offset when creating the date in UTC.

When you use %{+YYYY-MM-dd} in Logstash it will get that date information from the @timestamp field, which will always be in UTC.

To get your local time in the output you need to use some ruby code to create a string field with the value for YYYY-MM-dd in your timezone.

Something like this:

ruby {
  code => "
    local_time = Time.now.getlocal('+01:00').strftime('%Y-%m-%d')
    event.set('[@metadata][local_time]', local_time)
  "
}

Then you change your output to use this field, something like this:

path => "/var/log/paloalto_5651/panos-system-%{[@metadata][local_time]}.log"

Since this is a @metadata field, it can be used in filters and conditionals, but will not be present in the final event.

1 Like

Leandro's code will get you Time.now in the local timezone, you can replace the Time.now call with event.get("[@timestamp]").to_f if you want @timestamp in the local timezone.

1 Like

Hi,

Thank you for the solution you provided. It worked, and I was able to resolve the issue. I appreciate your help.

 ruby {
        code => "event.set('[@metadata][index]', event.get('[@timestamp]').time.getlocal('+03:00').strftime('%Y-%m-%d'))"
    }


"/var/log/paloalto_5651/panos-undefined-%{[@metadata][index]}.log"