Palo-alto парсер логов

Здравствуйте, я новичок, и не так хорошо ориентируюсь на сайте, потому прошу не судите строго.
Ситуация следующая:
Отправляю syslog с Paloalto в logstash, логи приходят но не обрабатываются, нашел пару вариантов парсера для обработки, но не один не дал результатов

 Syslog 988 USER.INFO: Oct  3 16:41:21 10.20.10.201  CEF:0|Palo Alto Networks|PAN-OS|6.0.0|end|TRAFFIC|1|\nrt=Oct 03 2022 11:41:21 GMT \ndeviceExternalId=012001030887 \nsrc=10.20.10.102 \ndst=5.2.65.241\nsourceTranslatedAddress=0.0.0.0 destinationTranslatedAddress=0.0.0.0 \ncs1Label=Rule \ncs1=URLWhitelist \nsuser=akyllytilsimat\v.sergachev \nduser= \napp=incomplete \ncs3Label=Virtual System cs3=vsys1 \ncs4=Trust \ncs5=Untrust \ndeviceInboundInterface=ethernet1/2 deviceOutboundInterface=ethernet1/1 \ncs6Label=LogProfile \ncs6=PaloSyslog \ncn1Label=SessionID\ncn1=67741 \ncnt=1 \nspt=54699 \ndpt=443 \nsourceTranslatedPort=0 destinationTranslatedPort=0 \nflexString1Label=Flags flexString1=0x1b \nproto=tcp \nact=allow \nflexNumber1Label=Total bytes flexNumber1=630 in=330\nout=300 \ncn2Label=Packets \ncn2=10 \nPanOSPacketsReceived=5 PanOSPacketsSent=5 \nstart=Oct 03 2022 11:41:21 GMT \ncn3Label=Elapsed time in seconds \ncn3=2 \ncs2Label=URL \nCategory cs2=any \nexternalId=7146523506911712157

вот приходящий лог.

Ниже найденный парсер:

filter {
    if "panOS" in [tags] {

        # Log types are "TRAFFIC", "THREAT", "CONFIG", "SYSTEM" and "HIP-MATCH".

        # Traffic log fields: https://docs.paloaltonetworks.com/pan-os/9-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/traffic-log-fields.html
        if ([message] =~ /TRAFFIC/) {
            csv {
                source => "message"
                columns => [ 
                    "FUTURE_USE", "ReceiveTime", "SerialNumber", "Type", "Threat_ContentType", "FUTURE_USE",
                    "GeneratedTime", "SourceIP", "DestinationIP", "NATSourceIP", "NATDestinationIP", "RuleName",
                    "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone",
                    "InboundInterface", "OutboundInterface", "LogAction", "FUTURE_USE", "SessionID",
                    "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags",
                    "Protocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTime",
                    "Category", "FUTURE_USE", "SequenceNumber", "ActionFlags", "SourceLocation", 
                    "DestinationLocation", "FUTURE_USE", "PacketsSent", "PacketsReceived", "SessionEndReason",
                    "DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
                    "DeviceGroupHierarchyLevel4", "VirtualSystemName", "DeviceName", "ActionSource", "SourceVMUUID",
                    "DestinationVMUUID", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID", "ParentStartTime",
                    "TunnelType", "SCTPAssociationID", "SCTPChunks", "SCTPChunksSent", "SCTPChunksReceived"
                ]
            }

            mutate {
                convert => [ "Bytes", "integer" ]
                convert => [ "BytesReceived", "integer" ]
                convert => [ "BytesSent", "integer" ]
                convert => [ "ElapsedTime", "integer" ]
                convert => [ "GeoIP.dma_code", "integer" ]
                convert => [ "GeoIP.latitude", "float" ]
                convert => [ "GeoIP.longitude", "float" ]
                convert => [ "NATDestinationPort", "integer" ]
                convert => [ "NATSourcePort", "integer" ]
                convert => [ "Packets", "integer" ]
                convert => [ "PacketsReceived", "integer" ]
                convert => [ "PacketsSent", "integer" ]
                convert => [ "SequenceNumber", "integer" ]

                add_tag => [ "panOS"]
            }
        }

        # Threat log fields: https://docs.paloaltonetworks.com/pan-os/9-0/pan-os-admin/monitoring/use-syslog-for-monitoring/syslog-field-descriptions/threat-log-fields.html
        else if ([message] =~ /THREAT/) {
            csv {
                source => "message"
                columns => [ 
                    "FUTURE_USE", "ReceiveTime", "SerialNumber", "Type", "Threat_ContentType", "FUTURE_USE",
                    "GeneratedTime", "SourceIP", "DestinationIP", "NATSourceIP", "NATDestinationIP", "RuleName", 
                    "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone",
                    "InboundInterface", "OutboundInterface", "LogAction", "FUTURE_USE", "SessionID",
                    "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags",
                    "Protocol", "Action", "URL_Filename", "ThreatID", "Category", "Severity", "Direction",
                    "SequenceNumber", "ActionFlags", "SourceLocation", "DestinationLocation", "FUTURE_USE", 
                    "ContentType", "PCAP_ID", "FileDigest", "Cloud", "URLIndex", "UserAgent", "FileType",
                    "X-Forwarded-For", "Referer", "Sender", "Subject", "Recipient", "ReportID",
                    "DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
                    "DeviceGroupHierarchyLevel4", "VirtualSystemName", "DeviceName", "FUTURE_USE", "SourceVMUUID",
                    "DestinationVMUUID", "HTTPMethod", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID", 
                    "ParentStartTime", "TunnelType", "ThreatCategory", "ContentVersion", "FUTURE_USE" , 
                    "SCTPAssociationID", "PayloadProtocolID", "HTTPHeaders" 
                ]
            }

            mutate {
                convert => [ "GeoIP.dma_code", "integer" ]
                convert => [ "GeoIP.latitude", "float" ]
                convert => [ "GeoIP.longitude", "float" ]
                convert => [ "NATDestinationPort", "integer" ]
                convert => [ "NATSourcePort", "integer" ]
                convert => [ "SequenceNumber", "integer" ]
      
                add_tag => ["panOS"]
            }
        }

        mutate {
            # Original message has been fully parsed, so remove it.
            remove_field => [ "message" ]
        }

        # Geolocate logs that have SourceIP if that SourceIP is a non-RFC1918 address
        if [SourceIP] and [SourceIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
               source => "SourceIP"
               target => "SourceIPGeo"
          }

            # Delete 0,0 in SourceIPGeo.location if equal to 0,0
            if ([SourceIPGeo.location] and [SourceIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "SourceIPGeo.location", "" ]
                }
            }
        }

        # Geolocate logs that have DestinationIP and if that DestinationIP is a non-RFC1918 address
        if [DestinationIP] and [DestinationIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
                source => "DestinationIP"
                target => "DestinationIPGeo"
            }
      
            # Delete 0,0 in DestinationIPGeo.location if equal to 0,0
            if ([DestinationIPGeo.location] and [DestinationIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "DestinationIPGeo.location", "" ]
                }
            }
        }

        # Takes the 5-tuple of source address, source port, destination address, destination port, and protocol and does a SHA1 hash to fingerprint the flow.  This is a useful
        # way to be able to do top N terms queries on flows, not just on one field.
        if [SourceIP] and [DestinationIP] {
            fingerprint {
                concatenate_sources => true
                method => "SHA1"
                key => "logstash"
                source => [ "SourceIP", "SourcePort", "DestinationIP", "DestinationPort", "Protocol" ]
            }
        }

    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.