It'd be helpful to know how far you got, and where exactly you got stuck
Disclaimer: I'm making a number of educated guesses here, since I only have one log line to get started with, but this is the approach I would take if I were processing those logs.
It looks like this log message was composed in layers:
- something logged a key/value structure
- something prefixed each message with pipe-delimited metadata
- syslog prefixed each message with a timestamp, syslog facility, syslog level, and hostname
So, it's going to be best to unpeel those layers one by one to get to the data you care about.
The outermost layer is easy enough to match with a grok pattern; I used Grok Constructor`s Incremental Mode to come up with this pattern:
\A%{TIMESTAMP_ISO8601} %{WORD}\.%{LOGLEVEL} %{SYSLOGHOST} %{GREEDYDATA}
We can use logstash-filter-grok
to peel off the data that syslog added into appropriately-named keys on event, and greedily capture the rest to temporarily store in the event's [@metadata]
:
# filter {
grok {
match => {
"message" => "\A%{TIMESTAMP_ISO8601:syslog_timestamp} %{WORD:syslog_facility}\.%{LOGLEVEL:syslog_level} %{SYSLOGHOST:syslog_host} %{GREEDYDATA:[@metadata][message]}"
}
}
# ...
# }
Since we extracted a timestamp, let's use logstash-filter-date
to set the event's timestamp:
date {
match => ["syslog_timestamp", "yyyy-MM-dd HH:mm:ss"]
}
The next layer looks neatly pipe-delimited, so let's use logstash-filter-dissect
to split up up the rest of the message that we captured into [@metadata][message]
into its component parts; I guessed a bit about the names of the components since I only had one line to look at, but this should give you a general idea:
# filter {
# ...
dissect {
mapping => {
"[@metadata][message]" => "%{cef}|%{program}|%{os}|%{version}|%{category}|%{desc}|%{zero}|%{[@metadata][kv]}"
}
}
# ...
# }
Now, we're down to that last layer, the key=value pairs we just put into[@metadata][kv]
, and we can use logstash-filter-kv
to capture all key/value pairs into an object called mail
:
# filter {
# ...
kv {
source => "[@metadata][kv]"
target => "mail"
# since values can have spaces, we need to define a field split pattern that won't break up values
# a field splitter is any [SPACE] that is followed by [end-of-input] or [alphanumerics followed by an equals sign]
field_split_pattern => " (?=$|[a-z0-9]+=)"
}
# ...
# }
The "field_split_pattern" option was recently added in v4.1.0 of logstash-filter-kv
, so you may need to update the plugin in order for this to work
Now, you said that you wanted to extract the e-mail addresses out of the [mail][duser]
field, but the values look something like this:
INBOUND;11106;emily@zootek.com.tw;ZOO00963|Emily Yan(EmilyYan);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License
INBOUND;11106;kelly@zootek.com.tw;ZOO02833|Kelly Li(Kelly Li);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License
We can grok the e-mail address out and place them in [mail][duser_email]
:
# filter {
# ...
grok {
match => {
"[mail][duser]" => ";%{EMAILADDRESS:[mail][duser_email]};"
}
}
# ...
# }
If we put that all together with some safety checks and comments, we get the following:
input {
stdin {}
}
filter {
grok {
# extract syslog metadata, placing rest in event's `[@metadata][message]`
match => {
"message" => "\A%{TIMESTAMP_ISO8601:syslog_timestamp} %{WORD:syslog_facility}\.%{LOGLEVEL:syslog_level} %{SYSLOGHOST:syslog_host} %{GREEDYDATA:[@metadata][message]}"
}
}
if "_grokparsefailure" not in [tags] {
# set the event's timestamp by parsing the timestamp we extracted
date {
match => ["syslog_timestamp", "yyyy-MM-dd HH:mm:ss"]
}
# extract top-level pipe-delimited data, placing rest in event's `[@metadata][kv]`
dissect {
mapping => {
"[@metadata][message]" => "%{cef}|%{program}|%{os}|%{version}|%{category}|%{desc}|%{zero}|%{[@metadata][kv]}"
}
}
if "_dissectfailure" not in [tags] {
kv {
source => "[@metadata][kv]"
target => "mail"
# since values can have spaces, we need to define a field split pattern that won't break up values
# a field splitter is any [SPACE] that is followed by [end-of-input] or [something that looks like a key]
field_split_pattern => " (?=$|[a-z0-9]+=)"
}
grok {
match => {
"[mail][duser]" => ";%{EMAILADDRESS:[mail][duser_email]};"
}
}
}
}
}
output {
stdout {
codec => rubydebug
}
}