ELK v 7.6.0 Paloalto take certain types of logs

I am working with ELK v 7.6.0

I have asked the paloalto firewall administrator to send me the logs via Syslog on port 514 to my server where I have ELK.

In the linux operating system in the path /etc/ the file rsyslog.conf I add 2 lines so that when the logs arrive the logs are stored in folders in the format year month day

Then by means of panos.conf file that is in the path /etc/logstash/conf.d I specify to break the logs from the path where I am receiving them. So far so normal.

Paloalto has several types of logs such as TRAFFIC, THREAT, SYSTEM, USERID, among others.

My question is if I can somehow specify to the logstash configuration file to only process for example the SYSTEM type and omit the other two types.

input {
    file {
        path => "/var/log/PANOS/system-PANOS1/2023/11/*/user1.log"
        exclude => "*.gz"
        start_position => "beginning"
        tags => [ "PAN-OS_SysLog" ]

filter {
    if "PAN-OS_SysLog" in [tags] {

        # Log types are "TRAFFIC", "THREAT", "CONFIG" and "SYSTEM". URL & Wildfire logs are inside Threat logs
    # Log fields: https://www.paloaltonetworks.com/documentation/80/pan-os/pan-os/monitoring/syslog-field-descriptions

        if ([message] =~ /TRAFFIC/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "ReceiveTime", "SerialNumber", "Type", "Threat_ContentType", "FUTURE_USE",
                    "GeneratedTime", "SourceIP", "DestinationIP", "NATSourceIP", "NATDestinationIP", "RuleName",
                    "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone",
                    "InboundInterface", "OutboundInterface", "LogForwardingProfile", "TimeLogged", "SessionID",
                    "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags",
                    "Protocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTime",
                    "URLCategory", "FUTURE_USE", "SequenceNumber", "ActionFlags", "SourceLocation",
                    "DestinationLocation", "FUTURE_USE", "PacketsSent", "PacketsReceived", "SessionEndReason",
                    "DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
                    "DeviceGroupHierarchyLevel4", "VirtualSystemName", "DeviceName", "ActionSource", "SourceVMUUID",
                    "DestinationVMUUID", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID", "ParentStartTime",

            mutate {
                convert => [ "Bytes", "integer" ]
                convert => [ "BytesReceived", "integer" ]
                convert => [ "BytesSent", "integer" ]
                convert => [ "ElapsedTime", "integer" ]
                convert => [ "GeoIP.dma_code", "integer" ]
                convert => [ "GeoIP.latitude", "float" ]
                convert => [ "GeoIP.longitude", "float" ]
                convert => [ "NATDestinationPort", "integer" ]
                convert => [ "NATSourcePort", "integer" ]
                convert => [ "Packets", "integer" ]
                convert => [ "PacketsReceived", "integer" ]
                convert => [ "PacketsSent", "integer" ]
                convert => [ "SequenceNumber", "integer" ]
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_Traffic"]

        else if ([message] =~ /THREAT/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "receive_time", "serial_number", "type", "threat_category", "version",
                    "GeneratedTime", "src_ip", "dest_ip", "src_translated_ip", "dest_translated_ip", "rule",
                    "src_user", "dest_ser", "application", "virtual_system", "src_zone", "dest_zone",
                    "src_interface", "dest_interface", "LogForwardingProfile", "FUTURE_USE", "session_id",
                    "repeat_count", "source_port", "dest_port", "src_translated_port", "dest_translated_port", "session_flags",
                    "protocol", "vendor_action", "misc", "threat", "raw_category", "severity", "direction",
                    "sequence_number", "action_flags", "client_location", "dest_location", "FUTURE_USE",
                    "ContentType", "pcap_id", "file_digest", "Cloud", "url_index", "user_agent", "file_type",
                    "X-Forwarded-For", "referer", "sender", "subject", "recipient", "FUTURE_USE",
                    "DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
                    "DeviceGroupHierarchyLevel4", "vsys_name", "DeviceName", "FUTURE_USE", "SourceVMUUID",
                    "DestinationVMUUID", "HTTPMethod", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID",
                    "ParentStartTime", "TunnelType", "category", "content_version", "FUTURE_USE", "FUTURE_USE",
                    "FUTURE_USE", "FUTURE_USE"

            mutate {
                convert => [ "GeoIP.dma_code", "integer" ]
                convert => [ "GeoIP.latitude", "float" ]
                convert => [ "GeoIP.longitude", "float" ]
                convert => [ "NATDestinationPort", "integer" ]
                convert => [ "NATSourcePort", "integer" ]
                convert => [ "SequenceNumber", "integer" ]
                replace => [ "host", "%{DeviceName}" ]
                add_tag => ["PAN-OS_Threat"]

        else if ([message] =~ /CONFIG/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "ReceiveTime", "Serial_Number", "Type", "Subtype", "FUTURE_USE", "GeneratedTime", "Host",
                    "Virtual_System", "Command", "Admin", "Client", "Result", "Configuration_Path", "Sequence_Number",
                    "Action_Flags", "Before_Change_Detail", "After_Change_Detail", "Device Group Hierarchy Level 1",
                    "Device Group Hierarchy Level 2", "Virtual_System_Name", "DeviceName"

            mutate {
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_Config"]

        else if ([message] =~ /CORRELATION/) {
            mutate {
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_Correlation"]

        else if ([message] =~ /SYSTEM/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "ReceiveTime", "Serial_Number", "Type", "Content/Threat_Type", "FUTURE_USE", "GeneratedTime",
                    "Virtual_System", "EventID", "Object", "FUTURE_USE", "FUTURE_USE", "Module", "Severity", "Description",
                    "Sequence_Number", "Action_Flags", "Device Group Hierarchy Level 1", "Device Group Hierarchy Level 2",
                    "Device Group Hierarchy Level 3", "Device Group Hierarchy Level 4", "Virtual_System_Name", "DeviceName"

            mutate {
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_System"]

        mutate {
            # Original message has been fully parsed, so remove it.
            #remove_field => [ "message" ]

        # Geolocate logs that have SourceIP if that SourceIP is a non-RFC1918 address
        if [SourceIP] and [SourceIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.
254\.)" {
            geoip {
               source => "SourceIP"
               target => "SourceIPGeo"

            # Delete 0,0 in SourceIPGeo.location if equal to 0,0
            if ([SourceIPGeo.location] and [SourceIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "SourceIPGeo.location", "" ]

        # Geolocate logs that have DestinationIP and if that DestinationIP is a non-RFC1918 address
        if [DestinationIP] and [DestinationIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\
.)|(^169\.254\.)" {
            geoip {
                source => "DestinationIP"
                target => "DestinationIPGeo"

            # Delete 0,0 in DestinationIPGeo.location if equal to 0,0
            if ([DestinationIPGeo.location] and [DestinationIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "DestinationIPGeo.location", "" ]


output {
stdout {}

    if "PAN-OS_SysLog" in [tags] {
        elasticsearch {
            index => "panos%{+yyyy.MM.dd}"
            hosts => ["myhostIP:9200"]

You mean only process logs from SYSTEM and drop everything else? It is not clear what you want to achieve.

If it is something like this that you want you can just use a conditional as you are already using to decide with message to parse.

if !([message] =~ /SYSTEM/) {
    drop {}

This will drop every message that is not a SYSTEM log

I will try it as you suggest, thanks

