I have a grok filter that I have built and tested on http://grokdebug.herokuapp.com.
the test sample is:
<190>3259426: *Dec 7 17:43:09.599 UTC: %SEC-6-IPACCESSLOGP: list ACL_3P denied tcp 10.10.10.2(8000) -> 10.20.20.2(59752), 1 packet
the filter is:
${CISCOIOSLOG}
#patterns
CISCOIOSTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}( %{TZ})?
CISCOIOSLOG %{WORD:log_facility}-%{INT:log_severity}-%{WORD:log_mnemonic}:
%{NUMBER:log_sequence_no}: \*%{CISCOIOSTIMESTAMP:acl_timestamp}: \%%{CISCOIOSLOG} list %{DATA:ACL_name} %{WORD:ACL_action} %{WORD:proto} %{IP:src_ip}\(%{INT:src_port}\) -> %{IP:dst_ip}\(%{INT:dst_port}\)\, %{INT:packets} packet
CISCOINTERFACE %{CISCO_REASON:facility}.%{LOGLEVEL:log_level}: %{NUMBER:log_sequence_no}: \*%{CISCOIOSTIMESTAMP:acl_timestamp}: \%%{CISCOIOSLOG} Line protocol on Interface %{INTERFACE:interface}, changed state to %{WORD:state}
and this works, I have added the patterns to a file in "/opt/logstash/patterns" and then my Logstash conf is below:
#
# INPUT - Logstash listens on port 5000 for these logs.
#
input {
udp {
port => 1514
type => "cisco-syslog"
}
tcp {
port => 1514
type => "cisco-syslog"
}
}
filter {
# NOTE: The frontend logstash servers set the type of incoming messages.
if [type] == "cisco-syslog" {
# The switches are sending the same message to all syslog servers for redundancy, this allows us to
## only store the message in elasticsearch once by generating a hash of the message and using that as
## the document_id.
fingerprint {
source => [ "message" ]
method => "SHA1"
key => "839a-3SD2-498d-314f-l18H"
concatenate_sources => true
}
# Parse the log entry into sections. Cisco doesn't use a consistent log format, unfortunately.
grok {
# There are a couple of custom patterns associated with this filter.
patterns_dir => [ "/opt/logstash/patterns" ]
match => [
# IOS ACL
"message", "%{CISCOIOSACL}",
# IOS
"message", "%{CISCOINTERFACE}",
"message", "%{CISCO_DEFAULT}",
# IOS
"message", "%{SYSLOG5424PRI}(%{NUMBER:log_sequence#})?:( %{NUMBER}:)? %{CISCOTIMESTAMPTZ:log_date}: %%{CISCO_REASON:facility}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:message}",
"message", "%{SYSLOG5424PRI}(%{NUMBER:log_sequence#})?:( %{NUMBER}:)? %{CISCOTIMESTAMPTZ:log_date}: %%{CISCO_REASON:facility}-%{CISCO_REASON:facility_sub}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:message}",
# Nexus
"message", "%{SYSLOG5424PRI}(%{NUMBER:log_sequence#})?: %{NEXUSTIMESTAMP:log_date}: %%{CISCO_REASON:facility}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:message}",
"message", "%{SYSLOG5424PRI}(%{NUMBER:log_sequence#})?: %{NEXUSTIMESTAMP:log_date}: %%{CISCO_REASON:facility}-%{CISCO_REASON:facility_sub}-%{INT:severity_level}-%{CISCO_REASON:facility_mnemonic}: %{GREEDYDATA:message}"
]
overwrite => [ "message" ]
add_tag => [ "cisco" ]
remove_field => [ "syslog5424_pri", "@version" ]
}
}
# If we made it here, the grok was sucessful
if "cisco" in [tags] {
date {
match => [
"log_date",
# IOS
"MMM dd HH:mm:ss.SSS ZZZ",
"MMM dd HH:mm:ss ZZZ",
"MMM dd HH:mm:ss.SSS",
# Nexus
"YYYY MMM dd HH:mm:ss.SSS ZZZ",
"YYYY MMM dd HH:mm:ss ZZZ",
"YYYY MMM dd HH:mm:ss.SSS",
# Hail marry
"ISO8601"
]
}
# Add the log level's name instead of just a number.
mutate {
gsub => [
"severity_level", "0", "0 - Emergency",
"severity_level", "1", "1 - Alert",
"severity_level", "2", "2 - Critical",
"severity_level", "3", "3 - Error",
"severity_level", "4", "4 - Warning",
"severity_level", "5", "5 - Notification",
"severity_level", "6", "6 - Informational"
]
}
# Translate the short facility name into a full name.
# NOTE: This is a third party plugin: logstash-filter-translate
translate {
field => "facility"
destination => "facility_full"
dictionary => [
"AAA", "Authentication, authorization, and accounting",
"AAA_CACHE", "Authentication, authorization, and accounting cache",
... output ommited
]
} # translate
} # if
} # filter
output {
# Something went wrong with the grok parsing, don't discard the messages though
if "_grokparsefailure" in [tags] {
file {
path => "/tmp/fail-%{type}-%{+YYYY.MM.dd}.log"
}
}
# The message was parsed correctly, and should be sent to elasicsearch.
if "cisco" in [tags] {
#file {
# path => "/tmp/%{type}-%{+YYYY.MM.dd}.log"
#}
elasticsearch {
hosts => "elasticsearch:9200"
manage_template => false
index => "network-%{+YYYY.MM.dd}"
document_type => "%{type}"
document_id => "%{fingerprint}"
}
}
}
all the logs are just ending up in /tmp/fail-cisco-syslog-2017.12.07.log and are not being processed.
sample from the logs:
{"@timestamp":"2017-12-07T17:45:57.208Z","@version":"1","host":"192.168.44.1","fingerprint":"03871ef40c9a56e019a09fd18194dd71762df96b","message":"<190>3259426: *Dec 7 17:43:09.599 UTC: %SEC-6-IPACCESSLOGP: list ACL_3P denied tcp 10.10.10.2(8000) -> 10.10.20.2(59752), 1 packet ","type":"cisco-syslog","tags":["_grokparsefailure"]}