Delay logs Paloalto

Hello,

I am completely new to ELK, in my case someone installed this tool but it is no longer in my organization and I am putting a lot of effort into trying to keep the ELK solution working.

Excuse my English as I'm using google translate.

I appreciate if you have a little patience since as I say I am not an expert in ELK.

Currently everything works but in the paloalto logs I see that there is a delay in the arrival of the logs of more than 8 hours.

Paloalto sends the logs to a server and these logs are stored in a path on the linux server. Then I enter a .conf file and modify a field called "path" to specify where the logs are located and the ELK solution proceeds with the normalization and parsing of the logs.

I only know that Kibana is at version 7.6.0 and I understand that Elasticsearch and Logstash are at the same version.

When I enter the path /etc/logstash/conf.d/ I look for the file paloalto.conf and this is the information it contains

Could you help me identify what causes the delay in the processing of the logs by ELK?

input {
    file {
            path => "xxx/xxx/xxx/xxx/xx/user.log"

       
        exclude => "*.gz"
        start_position => "beginning"
        tags => [ "PAN-OS_SysLog" ]
    }
}

filter {
    if "PAN-OS_SysLog" in [tags] {

        # Log types are "TRAFFIC", "THREAT", "CONFIG" and "SYSTEM". URL & Wildfire logs are inside Threat logs
    # Log fields: https://www.paloaltonetworks.com/documentation/80/pan-os/pan-os/monitoring/syslog-field-descriptions

        if ([message] =~ /TRAFFIC/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "ReceiveTime", "SerialNumber", "Type", "Threat_ContentType", "FUTURE_USE",
                    "GeneratedTime", "SourceIP", "DestinationIP", "NATSourceIP", "NATDestinationIP", "RuleName",
                    "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone",
                    "InboundInterface", "OutboundInterface", "LogForwardingProfile", "TimeLogged", "SessionID",
                    "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags",
                    "Protocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTime",
                    "URLCategory", "FUTURE_USE", "SequenceNumber", "ActionFlags", "SourceLocation",
                    "DestinationLocation", "FUTURE_USE", "PacketsSent", "PacketsReceived", "SessionEndReason",
                    "DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
                    "DeviceGroupHierarchyLevel4", "VirtualSystemName", "DeviceName", "ActionSource", "SourceVMUUID",
                    "DestinationVMUUID", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID", "ParentStartTime",
                    "TunnelType"
                ]
            }



            mutate {
                convert => [ "Bytes", "integer" ]
                convert => [ "BytesReceived", "integer" ]
                convert => [ "BytesSent", "integer" ]
                convert => [ "ElapsedTime", "integer" ]
                convert => [ "GeoIP.dma_code", "integer" ]
                convert => [ "GeoIP.latitude", "float" ]
                convert => [ "GeoIP.longitude", "float" ]
                convert => [ "NATDestinationPort", "integer" ]
                convert => [ "NATSourcePort", "integer" ]
                convert => [ "Packets", "integer" ]
                convert => [ "PacketsReceived", "integer" ]
                convert => [ "PacketsSent", "integer" ]
                convert => [ "SequenceNumber", "integer" ]
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_Traffic"]
                remove_field => ["message"]
            }
#                ruby {
#            code => "event['GeneratedTime'] = event['GeneratedTime'].localtime('+08:00')"
#    }

 }



        else if ([message] =~ /THREAT/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "receive_time", "serial_number", "type", "threat_category", "version",
                    "GeneratedTime", "src_ip", "dest_ip", "src_translated_ip", "dest_translated_ip", "rule",
                    "src_user", "dest_ser", "application", "virtual_system", "src_zone", "dest_zone",
                    "src_interface", "dest_interface", "LogForwardingProfile", "FUTURE_USE", "session_id",
                    "repeat_count", "source_port", "dest_port", "src_translated_port", "dest_translated_port", "session_flags",
                    "protocol", "vendor_action", "misc", "threat", "raw_category", "severity", "direction",
                    "sequence_number", "action_flags", "client_location", "dest_location", "FUTURE_USE",
                    "ContentType", "pcap_id", "file_digest", "Cloud", "url_index", "user_agent", "file_type",
                    "X-Forwarded-For", "referer", "sender", "subject", "recipient", "FUTURE_USE",
                    "DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
                    "DeviceGroupHierarchyLevel4", "vsys_name", "DeviceName", "FUTURE_USE", "SourceVMUUID",
                    "DestinationVMUUID", "HTTPMethod", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID",
                    "ParentStartTime", "TunnelType", "category", "content_version", "FUTURE_USE", "FUTURE_USE",
                    "FUTURE_USE", "FUTURE_USE"
                ]
            }

            mutate {
                convert => [ "GeoIP.dma_code", "integer" ]
                convert => [ "GeoIP.latitude", "float" ]
                convert => [ "GeoIP.longitude", "float" ]
                convert => [ "NATDestinationPort", "integer" ]
                convert => [ "NATSourcePort", "integer" ]
                convert => [ "SequenceNumber", "integer" ]
                replace => [ "host", "%{DeviceName}" ]
                add_tag => ["PAN-OS_Threat"]
                remove_field => ["message"]
        }
        }

        else if ([message] =~ /CONFIG/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "ReceiveTime", "Serial_Number", "Type", "Subtype", "FUTURE_USE", "GeneratedTime", "Host",
                    "Virtual_System", "Command", "Admin", "Client", "Result", "Configuration_Path", "Sequence_Number",
                    "Action_Flags", "Before_Change_Detail", "After_Change_Detail", "Device Group Hierarchy Level 1",
                    "Device Group Hierarchy Level 2", "Virtual_System_Name", "DeviceName"
                ]
            }

            mutate {
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_Config"]
                remove_field => ["message"]

        }
        }

        else if ([message] =~ /CORRELATION/) {
            mutate {
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_Correlation"]
            }
        }

        else if ([message] =~ /SYSTEM/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "ReceiveTime", "Serial_Number", "Type", "Content/Threat_Type", "FUTURE_USE", "GeneratedTime",
                    "Virtual_System", "EventID", "Object", "FUTURE_USE", "FUTURE_USE", "Module", "Severity", "Description",
                    "Sequence_Number", "Action_Flags", "Device Group Hierarchy Level 1", "Device Group Hierarchy Level 2",
                    "Device Group Hierarchy Level 3", "Device Group Hierarchy Level 4", "Virtual_System_Name", "DeviceName", "Bytes", "Bytes Sent"
                ]
           }

            mutate {
                replace => [ "host", "%{DeviceName}"]
                add_tag => [ "PAN-OS_System"]
#remove_field => ["message"]
            }
        }

        mutate {
            # Original message has been fully parsed, so remove it.
            #remove_field => [ "message" ]
        }

        # Geolocate logs that have SourceIP if that SourceIP is a non-RFC1918 address
        if [SourceIP] and [SourceIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
               source => "SourceIP"
               target => "SourceIPGeo"
          }

            # Delete 0,0 in SourceIPGeo.location if equal to 0,0
            if ([SourceIPGeo.location] and [SourceIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "SourceIPGeo.location", "" ]
                }
            }
        }

        # Geolocate logs that have DestinationIP and if that DestinationIP is a non-RFC1918 address
        if [DestinationIP] and [DestinationIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
                source => "DestinationIP"
                target => "DestinationIPGeo"
            }

            # Delete 0,0 in DestinationIPGeo.location if equal to 0,0
            if ([DestinationIPGeo.location] and [DestinationIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "DestinationIPGeo.location", "" ]
                }
            }
        }

    }
}

output {
    if "PAN-OS_SysLog" in [tags] {
        elasticsearch {
            index => "paloalto-%{+yyyy.MM.dd}"
            hosts => ["localhost:9200"]
        }
    }
}

Hi @juancamiloll Welcome to the community!

Can you show a sample result document from Discover?...

Show the @timestamp and original message fields.

I would not remove the message field while you are debugging, leave it in.

Most likely it is a @timestamp issue that is not being set correctly from the correct fielded and / or combined with timezone issue.

I do not see any logic to properly set the time... @timestamp otherwise it just be set to the current time / timezone.

Which Field do you believe should be the actual event time? GeneratedTime or ReceiveTime? or some other field.

Just for information ALL dates / time are stored in UTC inside elastic but will be shown in Kibana in the local timezone.

1 Like

@timestamp	Aug 31, 2022 @ 09:54:38.683
@version	1
Action	allow
ActionFlags	0x8000000000000000
ActionSource	from-policy
Application	dns-base
Bytes	210
BytesReceived	129
BytesSent	81
DestinationIP	192.168.xx.xxx
DestinationLocation	192.168.0.0-192.168.255.255
DestinationPort	53
DestinationUser	xxxxx
DestinationVMUUID	 - 
DestinationZone	gesxx
DeviceGroupHierarchyLevel1	11
DeviceGroupHierarchyLevel2	0
DeviceGroupHierarchyLevel3	0
DeviceGroupHierarchyLevel4	0
DeviceName	PA-3250-PRIMARIO
ElapsedTime	0
FUTURE_USE	0
Flags	0x19
GeneratedTime	Aug 30, 2022 @ 19:09:18.000
InboundInterface	ae2.15
LogForwardingProfile	xxxxx
MonitorTag_IMEI	 - 
NATDestinationIP	0.0.0.0
NATDestinationPort	0
NATSourceIP	0.0.0.0
NATSourcePort	0
OutboundInterface	aexx
Packets	2
PacketsReceived	1
PacketsSent	1
ParentSessionID	0
ParentStartTime	 - 
Protocol	udp
ReceiveTime	Aug 30, 2022 @ 19:10:00.000
RepeatCount	1
RuleName	ACCESxxxxx
SequenceNumber	114,01x,6x3,634
SerialNumber	016301x0x440
SessionEndReason	aged-out
SessionID	590x7x
SourceIP	192.16x.xx.Xx
SourceLocation	192.168.0.0-192.168.255.255
SourcePort	598xx
SourceUser	xxxx
SourceVMUUID	 - 
SourceZone	Bd_xxx
StartTime	Aug 30, 2022 @ 19:08:46.000
Threat_ContentType	end
TimeLogged	Aug 30, 2022 @ 19:09:18.000
TunnelID_IMSI	0
TunnelType	N/A
Type	TRAFFIC
URLCategory	any
VirtualSystem	vsxx
VirtualSystemName	xxxx
_id	t5Jl9IIBmKNbxzgXIXqZX9x
_index	paloalto-2022.08.31
_score	1
_type	_doc
column100	 - 
column101	 - 
column102	 - 
column103	Dec 31, 1969 @ 19:00:00.000
column104	0
column105	0
column62	0
column63	0
column64	0
column65	0
column66	db18db8d-x89x-46Xxe-8d4xd-c4xd7b2x139b
column67	0
column68	0
column69	 - 
column70	 - 
column71	 - 
column72	 - 
column73	 - 
column74	 - 
column75	 - 
column76	0.0.0.0
column77	 - 
column78	 - 
column79	 - 
column80	 - 
column81	 - 
column82	 - 
column83	 - 
column84	 - 
column85	 - 
column86	 - 
column87	 - 
column88	 - 
column89	 - 
column90	 - 
column91	 - 
column92	 - 
column93	 - 
column94	 - 
column95	 - 
column96	 - 
column97	 - 
column98	 - 
column99	 - 
host	PA-3250-PRIMARIO
path	/xxx/xxx/xxx/xxxnorama/2022/08/31/user.log
tags	PAN-OS_SysLog, PAN-OS_Traffic
  1. To preserve the message field I must remove the # character right?

  2. GeneratedTime

  3. raw log

Aug 31 10:05:26 Panorama 1,2022/08/31 10:05:26,016301006440,TRAFFIC,end,2305,2022/08/31 10:04:34,172.28.138.17,192.168.204.19,0.0.0.0,0.0.0.0,ACCES_xxx,domian\useruser,domain\user2,dnsx,vsyxx,xxxxxx,gestion,aeX.1,aex.3,FW_xxxx_PAN,2022/08/31 10:04:34,1446652,1,59084,53,0,0,0x19,udp,allow,150,75,75,2,2022/08/31 10:04:02,0,any,0,114088245692,0x8000000000000000,172.16.0.0-172.31.255.255,192.168.0.0-192.168.255.255,0,1,1,aged-out,11,0,0,0,ASDADS,PA-3250-PRxxxx,from-policy,,,0,,0,,N/A,0,0,0,0,bfffce57-d05f-48x6Xbxf7-0eae3983763a,0,0,,,,,,,,0.0.0.0,,,,,,,,,,,,,,,,,,,,,,,,,,,1969-12-31T19:00:00.000-05:00,0,0

First I forgot to say... 7.6 is very very old you should think about upgrading.

2nd This appears to be TRAFFIC Log so lets focus on that.

  1. To preserve the message field I must remove the # character right?

No... To keep the message field you comment out the remove_field

        if ([message] =~ /TRAFFIC/) {
            csv {
....
   mutate {
                convert => [ "Bytes", "integer" ]
...
                add_tag => [ "PAN-OS_Traffic"]
                # remove_field => ["message"] <!---- HERE Comment Out
            }
  1. GeneratedTime

You will need to add some code at the bottom.... put above this ...

# Geolocate logs that have SourceIP if that SourceIP is a non-RFC1918 address

Look at the date filter here

add something like this because your time is this format

2022/08/31 10:05:26

filter {
      date {
        match => [ "GeneratedTime", "yyyy/MM/dd HH:mm:ss" ]

      }
    } 

Then you may need to figure out timezone.. you have also you have SourceZone so you may want to add that to adjust.

BTW we have a filebeat module that does all this...

1 Like

Thank you for your help, I will apply what you have recommended and I will inform you

Question 1:
When I make a query in a range of 15 min, the number of logs received varies between 1,500,000 and 2,500,000 logs, in which cases could a bottleneck occur?

Question 2:
Going from the version I currently have to a more recent one, at the configuration level, are the changes very marked?

Questions 1) Yeah there could be a bottleneck for many reasons... Or perhaps that is your traffic pattern. Bottleneck could be Logstash or Elasticsearch or Both...
How big is you elasticsearch cluster in terms of Nodes, RAM, CPU etc What kind of disk?

Question 2) Depends what configuration you are talking about.
You should try to get to 7.17.6 there will be only minor configuration changes ...
There are tons of release notes... but not a lot of required configuration changes depending on the complexity of your cluster etc..

The logstash config really no changes.

@juancamiloll Some advice has already been given, but just some honest advice from me => If I were you I'd do the following asap:

  1. Update your stack to 7.17.6. The version you are using has multiple vulnerabilties and problems
  2. Have a look ath the panw.panos Filebeat module. You can configure Filebeat to handle the Palo Alto logs directly with a syslog input, so no need to store the logs on disk anymore (unless you need this for some unknown reason). Using this module also decreases the need for Logstash, as Filebeat can directly ingest to Elasticsearch. (unless you need Logstash for extra enrichment for example with jdbc lookups)

Good luck and best regards,

Willem

1 Like

@stephenb

  1. It is a machine that is in Azure
    Has 8 Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz

32GB RAM

HDD type disk

Do you think it could be a server performance issue?

  1. thanks, I suppose that if I don't know how to use an old version, I doubt I can use a more updated one.

Hello, thank you very much for taking the time to respond to my request for help.

Currently we receive the logs through syslog and these are sent to the ELK solution and with another SIEM called Splunk, it performs queries to optimize the issue of license consumption.

Updating ELK to another version runs the risk that some APIs that I currently have working stop doing so, I guess I'll have to start documenting myself about the new versions of ELK

I will investigate what you mention about Filebeat.

If your logs are exactly and always 8 hours off, you probably have a time zone mis-match. Your device may be logging in UTC but it's being interpreted as local time. What is your time zone? The other request for including the message field will help us too.

I wouldn't chase bottlenecks before looking for a data problem.

1 Like

Hello, thank you very much for answering.

The time continues to increase and there are 10 hours of delay at this moment

my time zone is (GMT-5)

Indexing into Elasticsearch can be I/O intensive. If you are experiencing bottlenecks with Elasticsearch and you have determined it is not the indexing pipeline limiting throughput, I would start looking at the disk as it is recommended to use SSDs with Elasticsearch.

1 Like

Also, it looks to me like your are running logstash and Elasticsearch on the same machine That can result in them competing for resources CPU and RAM.

1 Like

All 8 CPU's are showing 100%, logstash is the biggest load, it appears to be using 5 of the 8 alone. If you can add processors to this VM, it might be a quick temporary fix. However, fixing one bottleneck will probably expose the next one.

In my experience, if a server has enough processors, you can run both logstash and elasticsearch on the same host, but you want EXCESS cpu available.

1 Like

Unless you have a specific use case for Logstash, I'd recommend checking our Elastic Agent and our latest Palo Alto integration. It includes for all PANW event types (traffic, threat, user-id, hip match, Global Protect and more). All the processing is done within an ingest pipeline, so may alleviate some of the issues you are experiencing with Logstash.

2 Likes

Did you change your pipeline to use the GeneratedTime field as the value for the @timestamp field as proposed by @stephenb ?

This will at least make your documents have the correct time and not the time they were processed by Logstash.

Your main issue here is that you are running a couple of intensive applications on a small server, your palo alto pipeline for example, you are reading from a file input and from what you share you have something near 2000 e/s just for this pipeline, you also have elasticsearch in the same machine and from your screenshot it seems that you are also running splunk in the same server.

You are also using an HDD disk which can be pretty slow for the amount of data you want to read and write.

I would say that the high load in your server is probably caused by the IO wait, you should first try to split your services in more machines.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.