Looking at the filter I assume you are also trying to use ECS? I fear there would be anyway a mapping error if one time something in [host] is put and then in [host][ip]
In my experience the syslog-input plugin acts sometimes strange and is a little bit picky - so I switched to the tcp and udp input-plugins and set up the filters mostly on my own to handle all kinds of different syslog-patterns. For that I also use some additional patterns-file:
/etc/logstash/patterns/syslog_patterns
# orientation: match => { "message" => "\<%{NONNEGINT:priority}\>(%{NONNEGINT} | )(%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) %{SYSLOGHOST:syslog_host} %{SYSLOGPROG:syslogprog}(: | )%{GREEDYDATA:message}" }
SYSLOG5424PRINTASCII [!-~]+
SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:syslog_host}+(?: %{SYSLOGPROG:syslogprog}:|)
SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:syslog_pam_module}\(%{DATA:syslog_pam_caller}\): session %{WORD:syslog_pam_session_state} for user %{USERNAME:syslog_username}(?: by %{GREEDYDATA:syslog_pam_by})?
CRON_ACTION [A-Z ]+
CRONLOG %{SYSLOGBASE} \(%{USER:syslog_user}\) %{CRON_ACTION:syslog_action} \(%{DATA:message}\)
SYSLOGPRI <%{NONNEGINT:priority}>
# IETF 5424 syslog(8) format (see http://www.rfc-editor.org/info/rfc5424)
SYSLOG5424SD \[%{DATA}\]+
SYSLOG5424BASE %{SYSLOGPRI}%{NONNEGINT:syslog_ver} +(?:%{TIMESTAMP_ISO8601:syslog5424_ts}|-) +(?:%{IPORHOST:syslog_host}|-) +(-|%{SYSLOG5424PRINTASCII:syslog5424_app}) +(-|%{SYSLOG5424PRINTASCII:syslog5424_proc}) +(-|%{SYSLOG5424PRINTASCII:syslog5424_msgid}) +(?:%{SYSLOG5424SD:syslog5424_sd}|-|)
SYSLOG5424LINE %{SYSLOG5424BASE} +%{GREEDYDATA:syslog5424_msg}
# IETF 3164 syslog format
SYSLOGLINE (?:%{SYSLOGPRI})%{SYSLOGBASE2} %{GREEDYDATA:message}
# unix epoch time
UNIXEPOCH (\d){10}
UNIXEPOCHMS1 ((\d){10}\.(\d){3})
UNIXEPOCHMS2 (\d){13}
SYSLOGBASEUNIX (?:%{UNIXEPOCH:log_timestamp}|%{UNIXEPOCHMS1:log_timestamp}|%{UNIXEPOCHMS2:log_timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:syslog_host}+(?: %{SYSLOGPROG:syslogprog}:|)
SYSLOGLINEUNIX (?:%{SYSLOGPRI})%{SYSLOGBASEUNIX} %{GREEDYDATA:message}
the logstash pipeline config file looks similar to this (i had to remove/change some stuff before posting - i hope i didn't remove too much)
/etc/logstash/conf.d/filter-syslog.conf
input {
tcp {
port => 1514
# we need to change that some day
type => syslog
}
udp {
port => 1514
# we need to change that some day
type => syslog
buffer_size => 8192
codec => plain
}
}
filter {
# grok attempt for syslog RFC5424 or RFC3164
if "_grokparsesuccess" not in [tags] {
# Manually parse the log, as we want to support both RCF3164 and RFC5424
grok {
# load slightly changed default syslog patterns
patterns_dir => ["/etc/logstash/patterns/"]
match => { "message" => "%{SYSLOG5424LINE}" }
add_field => [ "received_from", "%{host}" ]
add_tag => [ "_grokparsesuccess" ]
overwrite => [ "message" ]
}
if [syslog5424_ts] {
# Handle RFC5424 formatted Syslog messages
mutate {
remove_field => [ "message", "host" ]
add_tag => [ "syslog5424" ]
}
mutate {
# Use a friendlier naming scheme
rename => {
"syslog5424_app" => "syslogprog"
"syslog5424_msg" => "message"
"syslog5424_host" => "syslog_host"
}
remove_field => [ "syslog5424_ver", "syslog5424_proc" ]
}
if [syslog5424_sd] {
# All structured data needs to be in format [key=value,key=value,...]
mutate {
# split at "][" brackets
split => { "syslog5424_sd" => "][" }
}
mutate {
# Remove any brackets in this array-field
gsub => [ "syslog5424_sd", "[\[\]]", "" ]
}
mutate {
rename => { "syslog5424_sd" => "[log][syslog][syslog5424][sd_data]" }
}
}
date {
match => [ "syslog5424_ts", "ISO8601" ]
remove_field => [ "syslog5424_ts", "timestamp" ]
}
}
else {
# Handle RFC3164 formatted Syslog messages
grok {
# load slightly changed default syslog patterns
patterns_dir => ["/etc/logstash/patterns/"]
match => { "message" => "%{SYSLOGLINE}" }
add_field => [ "received_from", "%{host}" ]
add_tag => [ "_grokparsesuccess" ]
add_tag => [ "syslog3164" ]
overwrite => [ "message" ]
}
}
}
# grok attempt for syslog messages with epoch-timestamps
if "_grokparsesuccess" not in [tags] {
# Manually parse the log, as we want to support both RCF3164 and RFC5424
grok {
# load slightly changed default syslog patterns
patterns_dir => ["/etc/logstash/patterns/"]
match => { "message" => "%{SYSLOGLINEUNIX}" }
add_field => [ "received_from", "%{host}" ]
add_tag => [ "_grokparsesuccess" ]
add_tag => [ "syslogunixepoch" ]
overwrite => [ "message" ]
}
}
# last grok attempt for syslog messages in most simple format - e.g. sent via old version of logger command
# <5>Jul 14 15:20:25 root: some test message
if "_grokparsesuccess" not in [tags] {
# Manually parse the log, as we want to support both RCF3164 and RFC5424
grok {
match => { "message" => "\<%{NONNEGINT:priority}\>(%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) %{SYSLOGPROG}: %{GREEDYDATA:message}" }
add_field => [ "received_from", "%{host}" ]
add_tag => [ "_grokparsesuccess" ]
add_tag => [ "simple_syslog" ]
overwrite => [ "message" ]
}
}
# this will replace the @timestamp with the timestamp from the event if in correct format
# check if log_timestamp is in correct format
date {
match => [ "log_timestamp", "MMM dd yyyy HH:mm:ss" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
date {
match => [ "log_timestamp", "MMM d yyyy HH:mm:ss" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
date {
match => [ "log_timestamp", "ISO8601" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
date {
match => [ "log_timestamp", "MMM dd HH:mm:ss" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
date {
match => [ "log_timestamp", "MMM d HH:mm:ss" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
date {
match => [ "log_timestamp", "UNIX" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
date {
match => [ "log_timestamp", "UNIX_MS" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
# because all grok-filters are taken into account there would be also a '_grokparsefailure'-tag,
# we don't need it if it was at least correctly filtered by one of the groks
if ("_grokparsesuccess" in [tags]) {
# syslog_pri extracts facility and loglevel from the "syslog_pri"ority-field
syslog_pri { syslog_pri_field_name => "priority" }
mutate{
remove_tag => [ "_grokparsefailure" ]
}
}
###########################################################
###### Final preps for ECS
# remove the _dateparsefailure if we find a success-tag
if ("_dateparsesuccess" in [tags]) {
mutate {
remove_tag => [ "_dateparsefailure" ]
}
}
# we have to remove the host field first so we can reuse the field name for ECS style later
# it seems that we cannot do it in the same mutate-statement!
if "_grokparsesuccess" in [tags] {
mutate {
remove_field => [ "host" ]
}
# in simple_syslog messages there is no explicit syslog_host :|
if "simple_syslog" not in [tags] {
mutate {
# we add stuff to arrays
add_field => { "[host][ip]" => "%{received_from}" }
add_field => { "[host][name]" => "%{syslog_host}" }
}
} else {
mutate {
# we add stuff to arrays
add_field => { "[host][ip]" => "%{received_from}" }
add_field => { "[host][name]" => "%{received_from}" }
}
}
} else {
# if no grok matched, we have to get the stuff from the host-field
mutate {
add_field => { "received_from" => "%{host}" }
}
# we have to remove the initial host-field
mutate {
remove_field => [ "host" ]
}
# we have to add this to the host.ip array
mutate {
add_field => { "[host][ip]" => "%{received_from}" }
# as we do not have a hostname because of bad parsing we have to leave the host.name field empty
}
}
# for keeping to ECS
if [syslog_severity] {
mutate {
add_field => { "[log][level]" => "%{syslog_severity}" }
}
}
# finally we rename and remove fields
mutate {
# we can rename our simple string/text/number-fields ### better - change later https://www.elastic.co/guide/en/ecs/current/ecs-log.html
rename => {
"pid" => "[process][pid]"
"program" => "[process][name]"
"syslogprog" => "[log][logger]"
"priority" => "[log][syslog][priority]"
"syslog_facility" => "[log][syslog][facility][name]"
"syslog_facility_code" => "[log][syslog][facility][code]"
"syslog_severity" => "[log][syslog][severity][name]"
"syslog_severity_code" => "[log][syslog][severity][code]"
"syslog_ver" => "[log][syslog][version]"
"received_at" => "[event][created]"
"loglevel" => "[log][level]"
}
# we remove unneeded fields with info we already have somewhere else in
# it's in host.name and host.ip if applicable
remove_field => [ "syslog_host" ]
# is event.created
remove_field => [ "received_at" ]
# is in host.ip if applicable
remove_field => [ "received_from" ]
# we add event.dataset so SIEM part in Kibana looks nice
add_field => { "[event][dataset]" => "%{type}" }
add_field => { "[event][type]" => "%{type}" }
# and finally we remove the type as this is duplicated info in event.dataset
remove_field => [ "type" ]
# some other ECS best practices
add_field => { "[ecs][version]" => "1.5.0" }
}
}
output {
elasticsearch {...}
}