Hello,
I am completely new to ELK, in my case someone installed this tool but it is no longer in my organization and I am putting a lot of effort into trying to keep the ELK solution working.
Excuse my English as I'm using google translate.
I appreciate if you have a little patience since as I say I am not an expert in ELK.
Currently everything works but in the paloalto logs I see that there is a delay in the arrival of the logs of more than 8 hours.
Paloalto sends the logs to a server and these logs are stored in a path on the linux server. Then I enter a .conf file and modify a field called "path" to specify where the logs are located and the ELK solution proceeds with the normalization and parsing of the logs.
I only know that Kibana is at version 7.6.0 and I understand that Elasticsearch and Logstash are at the same version.
When I enter the path /etc/logstash/conf.d/ I look for the file paloalto.conf and this is the information it contains
Could you help me identify what causes the delay in the processing of the logs by ELK?
input {
file {
path => "xxx/xxx/xxx/xxx/xx/user.log"
exclude => "*.gz"
start_position => "beginning"
tags => [ "PAN-OS_SysLog" ]
}
}
filter {
if "PAN-OS_SysLog" in [tags] {
# Log types are "TRAFFIC", "THREAT", "CONFIG" and "SYSTEM". URL & Wildfire logs are inside Threat logs
# Log fields: https://www.paloaltonetworks.com/documentation/80/pan-os/pan-os/monitoring/syslog-field-descriptions
if ([message] =~ /TRAFFIC/) {
csv {
source => "message"
columns => [
"FUTURE_USE", "ReceiveTime", "SerialNumber", "Type", "Threat_ContentType", "FUTURE_USE",
"GeneratedTime", "SourceIP", "DestinationIP", "NATSourceIP", "NATDestinationIP", "RuleName",
"SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone",
"InboundInterface", "OutboundInterface", "LogForwardingProfile", "TimeLogged", "SessionID",
"RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags",
"Protocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTime",
"URLCategory", "FUTURE_USE", "SequenceNumber", "ActionFlags", "SourceLocation",
"DestinationLocation", "FUTURE_USE", "PacketsSent", "PacketsReceived", "SessionEndReason",
"DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
"DeviceGroupHierarchyLevel4", "VirtualSystemName", "DeviceName", "ActionSource", "SourceVMUUID",
"DestinationVMUUID", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID", "ParentStartTime",
"TunnelType"
]
}
mutate {
convert => [ "Bytes", "integer" ]
convert => [ "BytesReceived", "integer" ]
convert => [ "BytesSent", "integer" ]
convert => [ "ElapsedTime", "integer" ]
convert => [ "GeoIP.dma_code", "integer" ]
convert => [ "GeoIP.latitude", "float" ]
convert => [ "GeoIP.longitude", "float" ]
convert => [ "NATDestinationPort", "integer" ]
convert => [ "NATSourcePort", "integer" ]
convert => [ "Packets", "integer" ]
convert => [ "PacketsReceived", "integer" ]
convert => [ "PacketsSent", "integer" ]
convert => [ "SequenceNumber", "integer" ]
replace => [ "host", "%{DeviceName}" ]
add_tag => [ "PAN-OS_Traffic"]
remove_field => ["message"]
}
# ruby {
# code => "event['GeneratedTime'] = event['GeneratedTime'].localtime('+08:00')"
# }
}
else if ([message] =~ /THREAT/) {
csv {
source => "message"
columns => [
"FUTURE_USE", "receive_time", "serial_number", "type", "threat_category", "version",
"GeneratedTime", "src_ip", "dest_ip", "src_translated_ip", "dest_translated_ip", "rule",
"src_user", "dest_ser", "application", "virtual_system", "src_zone", "dest_zone",
"src_interface", "dest_interface", "LogForwardingProfile", "FUTURE_USE", "session_id",
"repeat_count", "source_port", "dest_port", "src_translated_port", "dest_translated_port", "session_flags",
"protocol", "vendor_action", "misc", "threat", "raw_category", "severity", "direction",
"sequence_number", "action_flags", "client_location", "dest_location", "FUTURE_USE",
"ContentType", "pcap_id", "file_digest", "Cloud", "url_index", "user_agent", "file_type",
"X-Forwarded-For", "referer", "sender", "subject", "recipient", "FUTURE_USE",
"DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
"DeviceGroupHierarchyLevel4", "vsys_name", "DeviceName", "FUTURE_USE", "SourceVMUUID",
"DestinationVMUUID", "HTTPMethod", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID",
"ParentStartTime", "TunnelType", "category", "content_version", "FUTURE_USE", "FUTURE_USE",
"FUTURE_USE", "FUTURE_USE"
]
}
mutate {
convert => [ "GeoIP.dma_code", "integer" ]
convert => [ "GeoIP.latitude", "float" ]
convert => [ "GeoIP.longitude", "float" ]
convert => [ "NATDestinationPort", "integer" ]
convert => [ "NATSourcePort", "integer" ]
convert => [ "SequenceNumber", "integer" ]
replace => [ "host", "%{DeviceName}" ]
add_tag => ["PAN-OS_Threat"]
remove_field => ["message"]
}
}
else if ([message] =~ /CONFIG/) {
csv {
source => "message"
columns => [
"FUTURE_USE", "ReceiveTime", "Serial_Number", "Type", "Subtype", "FUTURE_USE", "GeneratedTime", "Host",
"Virtual_System", "Command", "Admin", "Client", "Result", "Configuration_Path", "Sequence_Number",
"Action_Flags", "Before_Change_Detail", "After_Change_Detail", "Device Group Hierarchy Level 1",
"Device Group Hierarchy Level 2", "Virtual_System_Name", "DeviceName"
]
}
mutate {
replace => [ "host", "%{DeviceName}" ]
add_tag => [ "PAN-OS_Config"]
remove_field => ["message"]
}
}
else if ([message] =~ /CORRELATION/) {
mutate {
replace => [ "host", "%{DeviceName}" ]
add_tag => [ "PAN-OS_Correlation"]
}
}
else if ([message] =~ /SYSTEM/) {
csv {
source => "message"
columns => [
"FUTURE_USE", "ReceiveTime", "Serial_Number", "Type", "Content/Threat_Type", "FUTURE_USE", "GeneratedTime",
"Virtual_System", "EventID", "Object", "FUTURE_USE", "FUTURE_USE", "Module", "Severity", "Description",
"Sequence_Number", "Action_Flags", "Device Group Hierarchy Level 1", "Device Group Hierarchy Level 2",
"Device Group Hierarchy Level 3", "Device Group Hierarchy Level 4", "Virtual_System_Name", "DeviceName", "Bytes", "Bytes Sent"
]
}
mutate {
replace => [ "host", "%{DeviceName}"]
add_tag => [ "PAN-OS_System"]
#remove_field => ["message"]
}
}
mutate {
# Original message has been fully parsed, so remove it.
#remove_field => [ "message" ]
}
# Geolocate logs that have SourceIP if that SourceIP is a non-RFC1918 address
if [SourceIP] and [SourceIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
source => "SourceIP"
target => "SourceIPGeo"
}
# Delete 0,0 in SourceIPGeo.location if equal to 0,0
if ([SourceIPGeo.location] and [SourceIPGeo.location] =~ "0,0") {
mutate {
replace => [ "SourceIPGeo.location", "" ]
}
}
}
# Geolocate logs that have DestinationIP and if that DestinationIP is a non-RFC1918 address
if [DestinationIP] and [DestinationIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
source => "DestinationIP"
target => "DestinationIPGeo"
}
# Delete 0,0 in DestinationIPGeo.location if equal to 0,0
if ([DestinationIPGeo.location] and [DestinationIPGeo.location] =~ "0,0") {
mutate {
replace => [ "DestinationIPGeo.location", "" ]
}
}
}
}
}
output {
if "PAN-OS_SysLog" in [tags] {
elasticsearch {
index => "paloalto-%{+yyyy.MM.dd}"
hosts => ["localhost:9200"]
}
}
}