Failed to parse date field

I currently have a problem with an error message that is appearing that refers to a date field but is not detecting it as such.

[2023-02-11T15:16:39,111][WARN ][logstash.outputs.elasticsearch][main] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"p
aloalto-2023.02.11", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x39a3daeb>], :response=>{"index"=>{"_index"=>"paloalto-2023.02.11", "_type"=>"_doc", "_id"=>"pp8
fQoYBLVrlAEewYqTX", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [column108] of type [date] in document with id 'pp8fQo
YBLVrlAEewYqTX'. Preview of field's value: 'network-protocol'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [network-proto
col] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}
}}}

When I enter Kibana from the menu Management > Index patterns > Select the index paloalto, I find the field and it is indeed in conflict.

there is no option to assign it as type "date".

What is more strange to me is that I haven't touched the configuration file for more than 15 days, and only today it started to present problems where I can identify that the Threat and SYSTEM logs are processed but not the TRAFFIC logs.

As I mentioned before I am not an expert in ELK, I would appreciate any suggestions you can give me.

Thank you

In the current index [column108] is expected to be a date. The string "network-protocol" cannot be parsed as a date.

So some of your events have the wrong data in them. If the first document indexed in a new daily index passes date detection the column will be mapped as a date. If the first document that arrives has "network-protocol" in that column it will be mapped as text.

Once the mapping type is set it cannot be changed. You can use an index template to tell elasticsearch that column should be a date. You will still get the mapping exception (unless you fix your data), but none of your indexes will have the column mapped as text.

1 Like

Can you share your Logstash pipeline? There is something weird as the TRAFFIC csv events from PAN-OS doesn't have 108 fields, it has something around 75 columns.

Since you have a field named column108 there is something wrong in the parser of the csv message from PAN-OS.

But your current issue is related to what Badger said.

hello, thank you for your reply @leandrojmp

I'm sorry if I didn't understand, I think this is the information you asked for, if not, please tell me how and from where I could get it.

input {
    file {
        path => "/opt/log/soc/mypath/2023/02/*/user.log"
        exclude => "*.gz"
        start_position => "beginning"
        tags => [ "PAN-OS_SysLog" ]
    }
}

filter {
    if "PAN-OS_SysLog" in [tags] {

        # Log types are "TRAFFIC", "THREAT", "CONFIG" and "SYSTEM". URL & Wildfire logs are inside Threat logs
    # Log fields: https://www.paloaltonetworks.com/documentation/80/pan-os/pan-os/monitoring/syslog-field-descriptions

        if ([message] =~ /TRAFFIC/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "ReceiveTime", "SerialNumber", "Type", "Threat_ContentType", "FUTURE_USE",
                    "GeneratedTime", "SourceIP", "DestinationIP", "NATSourceIP", "NATDestinationIP", "RuleName",
                    "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone",
                    "InboundInterface", "OutboundInterface", "LogForwardingProfile", "TimeLogged", "SessionID",
                    "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags",
                    "Protocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTime",
                    "URLCategory", "FUTURE_USE", "SequenceNumber", "ActionFlags", "SourceLocation",
                    "DestinationLocation", "FUTURE_USE", "PacketsSent", "PacketsReceived", "SessionEndReason",
                    "DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
                    "DeviceGroupHierarchyLevel4", "VirtualSystemName", "DeviceName", "ActionSource", "SourceVMUUID",
                    "DestinationVMUUID", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID", "ParentStartTime",
                    "TunnelType"
                ]
            }



            mutate {
                convert => [ "Bytes", "integer" ]
                convert => [ "BytesReceived", "integer" ]
                convert => [ "BytesSent", "integer" ]
                convert => [ "ElapsedTime", "integer" ]
                convert => [ "GeoIP.dma_code", "integer" ]
                convert => [ "GeoIP.latitude", "float" ]
                convert => [ "GeoIP.longitude", "float" ]
                convert => [ "NATDestinationPort", "integer" ]
                convert => [ "NATSourcePort", "integer" ]
                convert => [ "Packets", "integer" ]
                convert => [ "PacketsReceived", "integer" ]
                convert => [ "PacketsSent", "integer" ]
                convert => [ "SequenceNumber", "integer" ]
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_Traffic"]
                remove_field => ["message"]
            }
#                ruby {
#            code => "event['GeneratedTime'] = event['GeneratedTime'].localtime('+08:00')"
#    }

 }



        else if ([message] =~ /THREAT/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "receive_time", "serial_number", "type", "threat_category", "version",
                    "GeneratedTime", "src_ip", "dest_ip", "src_translated_ip", "dest_translated_ip", "rule",
                    "src_user", "dest_ser", "application", "virtual_system", "src_zone", "dest_zone",
                    "src_interface", "dest_interface", "LogForwardingProfile", "FUTURE_USE", "session_id",
                    "repeat_count", "source_port", "dest_port", "src_translated_port", "dest_translated_port", "session_flags",
                    "protocol", "vendor_action", "misc", "threat", "raw_category", "severity", "direction",
                    "sequence_number", "action_flags", "client_location", "dest_location", "FUTURE_USE",
                    "ContentType", "pcap_id", "file_digest", "Cloud", "url_index", "user_agent", "file_type",
                    "X-Forwarded-For", "referer", "sender", "subject", "recipient", "FUTURE_USE",
                    "DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
                    "DeviceGroupHierarchyLevel4", "vsys_name", "DeviceName", "FUTURE_USE", "SourceVMUUID",
                    "DestinationVMUUID", "HTTPMethod", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID",
                    "ParentStartTime", "TunnelType", "category", "content_version", "FUTURE_USE", "FUTURE_USE",
                    "FUTURE_USE", "FUTURE_USE"
                ]
            }

            mutate {
                convert => [ "GeoIP.dma_code", "integer" ]
                convert => [ "GeoIP.latitude", "float" ]
                convert => [ "GeoIP.longitude", "float" ]
                convert => [ "NATDestinationPort", "integer" ]
                convert => [ "NATSourcePort", "integer" ]
                convert => [ "SequenceNumber", "integer" ]
                replace => [ "host", "%{DeviceName}" ]
                add_tag => ["PAN-OS_Threat"]
                remove_field => ["message"]
        }
        }

        else if ([message] =~ /CONFIG/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "ReceiveTime", "Serial_Number", "Type", "Subtype", "FUTURE_USE", "GeneratedTime", "Host",
                    "Virtual_System", "Command", "Admin", "Client", "Result", "Configuration_Path", "Sequence_Number",
                    "Action_Flags", "Before_Change_Detail", "After_Change_Detail", "Device Group Hierarchy Level 1",
                    "Device Group Hierarchy Level 2", "Virtual_System_Name", "DeviceName"
                ]
            }

            mutate {
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_Config"]
                remove_field => ["message"]

        }
        }

        else if ([message] =~ /CORRELATION/) {
            mutate {
                replace => [ "host", "%{DeviceName}" ]
                add_tag => [ "PAN-OS_Correlation"]
            }
        }

        else if ([message] =~ /SYSTEM/) {
            csv {
                source => "message"
                columns => [
                    "FUTURE_USE", "ReceiveTime", "Serial_Number", "Type", "Content/Threat_Type", "FUTURE_USE", "GeneratedTime",
                    "Virtual_System", "EventID", "Object", "FUTURE_USE", "FUTURE_USE", "Module", "Severity", "Description",
                    "Sequence_Number", "Action_Flags", "Device Group Hierarchy Level 1", "Device Group Hierarchy Level 2",
                    "Device Group Hierarchy Level 3", "Device Group Hierarchy Level 4", "Virtual_System_Name", "DeviceName", "Bytes", "Bytes Sent"
                ]
           }

            mutate {
                replace => [ "host", "%{DeviceName}"]
                add_tag => [ "PAN-OS_System"]
#remove_field => ["message"]
            }
        }

        mutate {
            # Original message has been fully parsed, so remove it.
            #remove_field => [ "message" ]
        }

        # Geolocate logs that have SourceIP if that SourceIP is a non-RFC1918 address
        if [SourceIP] and [SourceIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
               source => "SourceIP"
               target => "SourceIPGeo"
          }

            # Delete 0,0 in SourceIPGeo.location if equal to 0,0
            if ([SourceIPGeo.location] and [SourceIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "SourceIPGeo.location", "" ]
                }
            }
        }

        # Geolocate logs that have DestinationIP and if that DestinationIP is a non-RFC1918 address
        if [DestinationIP] and [DestinationIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
            geoip {
                source => "DestinationIP"
                target => "DestinationIPGeo"
            }

            # Delete 0,0 in DestinationIPGeo.location if equal to 0,0
            if ([DestinationIPGeo.location] and [DestinationIPGeo.location] =~ "0,0") {
                mutate {
                    replace => [ "DestinationIPGeo.location", "" ]
                }
            }
        }

    }
}

output {
    if "PAN-OS_SysLog" in [tags] {
        elasticsearch {
            index => "paloalto-%{+yyyy.MM.dd}"
            hosts => ["localhost:9200"]
        }
    }
}

@Badger Thank you for taking the time to answer my question, although I must confess with shame that I did not understand anything.

The strangest thing is that the error has disappeared and I have not modified anything.

That says that if "CONFIG" appears anywhere in the syslog message then you are going to parse it as if it is a CONFIG message. But there are other messages that could contain the string "CONFIG" (or "SYSTEM" etc.) somewhere inside them.

You could use if ([message] =~ /,CONFIG,/), but it would be cleaner to parse off the first four fields that (I think) are common to all messages ("FUTURE_USE", "ReceiveTime", "Serial_Number", "Type",) and then determine the rest of the parsing using the [Type] field.

What I think is happening is that one of sections is incorrectly detecting two other types of messages. The csv filter will use column names like column108 of the columns option does not consume all the columns in the event. In one of those incorrectly detected messages column108 is a date, but in a few cases it is a string.

If that is true then fixing the parsing will avoid the mapping errors and you will not have to add a template.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.