How to parse mail log from syslog

Hi,
I have a .TXT file abut mail log from syslog, i try to parse log with grok, but failed.

the source row:

2018-03-14 13:56:17 local0.info ms1.zootek.com.tw CEF:0|Cellopoint|CelloOS|4.1.5|FILTER_LOG|Filter by mail processor|0|gid=O6OWII00060:P5KGHS10XF lid=1979183008 src=192.168.100.71 cs1=0.000 spt=0 dst=192.168.50.163 dpt=0 suser=wip@tera.com.tw duser=INBOUND;11106;emily@zootek.com.tw;ZOO00963|Emily Yan(EmilyYan);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License duser=INBOUND;11106;kelly@zootek.com.tw;ZOO02833|Kelly Li(Kelly Li);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License rt=1521006975 fsize=14876 msg=New product for you and 50% discount cs6=????????????????????????????????? cs3=0050047006_20180314.xls cn1=SYSTEM,AUDIT uuid=895d99c727da42f0879482895344fa56-20180314 cs5={"audit\/tw.name":1}\r\n

i just want get few key columns (time, Sender, Receive list, subject, attache file and size )
time : 2018-03-14 13:56:17
Sender (suser): wip@tera.com.tw
Receive (duser): emily@zootek.com.tw, kelly@zootek.com.tw
Subject (msg): New product for you and 50% discount
Attache file (cs3): 0050047006_20180314.xls
Size (fsize) :14876

Sometimes, the receive not only 2 person , Maybe more people.
how to get all receive information in grok??

Thanks a lot.

Re-format

2018-03-14 13:56:17 local0.info ms1.zootek.com.tw CEF:0|Cellopoint|CelloOS|4.1.5|FILTER_LOG|Filter by mail processor|0|
gid=O6OWII00060:P5KGHS10XF lid=1979183008
src=192.168.100.71 cs1=0.000 spt=0
dst=192.168.50.163 dpt=0
suser=wip@tera.com.tw
duser=INBOUND;11106;emily@zootek.com.tw;ZOO00963|Emily Yan(EmilyYan);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License
duser=INBOUND;11106;kelly@zootek.com.tw;ZOO02833|Kelly Li(Kelly Li);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License
rt=1521006975
fsize=14876
msg=New product for your and 50% discount
cs6=?????????????????????????????????
cs3=0050047006_20180314.xls
cn1=SYSTEM,AUDIT
uuid=895d99c727da42f0879482895344fa56-20180314
cs5={"audit\/tw.name":1}\r\n

It'd be helpful to know how far you got, and where exactly you got stuck :smile:

Disclaimer: I'm making a number of educated guesses here, since I only have one log line to get started with, but this is the approach I would take if I were processing those logs.

It looks like this log message was composed in layers:

  • something logged a key/value structure
  • something prefixed each message with pipe-delimited metadata
  • syslog prefixed each message with a timestamp, syslog facility, syslog level, and hostname

So, it's going to be best to unpeel those layers one by one to get to the data you care about.

The outermost layer is easy enough to match with a grok pattern; I used Grok Constructor`s Incremental Mode to come up with this pattern:

\A%{TIMESTAMP_ISO8601} %{WORD}\.%{LOGLEVEL} %{SYSLOGHOST} %{GREEDYDATA}

We can use logstash-filter-grok to peel off the data that syslog added into appropriately-named keys on event, and greedily capture the rest to temporarily store in the event's [@metadata]:

# filter {
  grok {
    match => {
      "message" => "\A%{TIMESTAMP_ISO8601:syslog_timestamp} %{WORD:syslog_facility}\.%{LOGLEVEL:syslog_level} %{SYSLOGHOST:syslog_host} %{GREEDYDATA:[@metadata][message]}"
    }
  }
# ...
# }

Since we extracted a timestamp, let's use logstash-filter-date to set the event's timestamp:

    date {
      match => ["syslog_timestamp", "yyyy-MM-dd HH:mm:ss"]
    }

The next layer looks neatly pipe-delimited, so let's use logstash-filter-dissect to split up up the rest of the message that we captured into [@metadata][message] into its component parts; I guessed a bit about the names of the components since I only had one line to look at, but this should give you a general idea:

# filter {
# ...
  dissect {
    mapping => {
      "[@metadata][message]" => "%{cef}|%{program}|%{os}|%{version}|%{category}|%{desc}|%{zero}|%{[@metadata][kv]}"
    }
  }
# ...
# }

Now, we're down to that last layer, the key=value pairs we just put into[@metadata][kv], and we can use logstash-filter-kv to capture all key/value pairs into an object called mail:

# filter {
# ...
  kv {
    source => "[@metadata][kv]"
    target => "mail"
    # since values can have spaces, we need to define a field split pattern that won't break up values
    # a field splitter is any [SPACE] that is followed by [end-of-input] or [alphanumerics followed by an equals sign]
    field_split_pattern => " (?=$|[a-z0-9]+=)"
  }
# ...
# }

The "field_split_pattern" option was recently added in v4.1.0 of logstash-filter-kv, so you may need to update the plugin in order for this to work

Now, you said that you wanted to extract the e-mail addresses out of the [mail][duser] field, but the values look something like this:

INBOUND;11106;emily@zootek.com.tw;ZOO00963|Emily Yan(EmilyYan);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License
INBOUND;11106;kelly@zootek.com.tw;ZOO02833|Kelly Li(Kelly Li);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License

We can grok the e-mail address out and place them in [mail][duser_email]:

# filter {
# ...
  grok {
    match => {
      "[mail][duser]" => ";%{EMAILADDRESS:[mail][duser_email]};"
    }
  }
# ...
# }

If we put that all together with some safety checks and comments, we get the following:

input {
  stdin {}
}
filter {
  grok {
    # extract syslog metadata, placing rest in event's `[@metadata][message]`
    match => {
      "message" => "\A%{TIMESTAMP_ISO8601:syslog_timestamp} %{WORD:syslog_facility}\.%{LOGLEVEL:syslog_level} %{SYSLOGHOST:syslog_host} %{GREEDYDATA:[@metadata][message]}"
    }
  }
  if "_grokparsefailure" not in [tags] {
    # set the event's timestamp by parsing the timestamp we extracted
    date {
      match => ["syslog_timestamp", "yyyy-MM-dd HH:mm:ss"]
    }
    # extract top-level pipe-delimited data, placing rest in event's `[@metadata][kv]`
    dissect {
      mapping => {
        "[@metadata][message]" => "%{cef}|%{program}|%{os}|%{version}|%{category}|%{desc}|%{zero}|%{[@metadata][kv]}"
      }
    }
    if "_dissectfailure" not in [tags] {
      kv {
        source => "[@metadata][kv]"
        target => "mail"
        # since values can have spaces, we need to define a field split pattern that won't break up values
        # a field splitter is any [SPACE] that is followed by [end-of-input] or [something that looks like a key]
        field_split_pattern => " (?=$|[a-z0-9]+=)"
      }
      grok {
        match => {
          "[mail][duser]" => ";%{EMAILADDRESS:[mail][duser_email]};"
        }
      }
    }
  }
}
output {
  stdout {
    codec => rubydebug
  }
}
1 Like

Now, if I cat the contents of input.log to logstash with this pipeline, we get a rich event that has a bunch of useful data, including everything you wanted extracted :smile::

╭─{ yaauie@castrovel:~/src/elastic/discuss-scratch/124287-mail-syslog }
╰─○ cat input.log | ~/src/elastic/releases/logstash-6.2.2/bin/logstash -f pipeline.conf
Sending Logstash's logs to /Users/yaauie/src/elastic/releases/logstash-6.2.2/logs which is now configured via log4j2.properties
[2018-03-16T17:17:59,313][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/Users/yaauie/src/elastic/releases/logstash-6.2.2/modules/fb_apache/configuration"}
[2018-03-16T17:17:59,348][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/Users/yaauie/src/elastic/releases/logstash-6.2.2/modules/netflow/configuration"}
[2018-03-16T17:17:59,656][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-03-16T17:18:00,339][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.2"}
[2018-03-16T17:18:00,777][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-16T17:18:07,195][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-16T17:18:07,586][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x7ac593af run>"}
[2018-03-16T17:18:07,742][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
{
    "syslog_timestamp" => "2018-03-14 13:56:17",
                 "cef" => "CEF:0",
            "category" => "FILTER_LOG",
             "program" => "Cellopoint",
                "mail" => {
               "uuid" => "895d99c727da42f0879482895344fa56-20180314",
                "src" => "192.168.100.71",
                "cs5" => "{\"audit\\/tw.name\":1}\\r\\n",
                "lid" => "1979183008",
              "suser" => "wip@tera.com.tw",
        "duser_email" => [
            [0] "emily@zootek.com.tw",
            [1] "kelly@zootek.com.tw"
        ],
              "fsize" => "14876",
                "msg" => "New product for you and 50% discount",
                "gid" => "O6OWII00060:P5KGHS10XF",
                "cs6" => "?????????????????????????????????",
                "cs1" => "0.000",
                "spt" => "0",
                "dpt" => "0",
              "duser" => [
            [0] "INBOUND;11106;emily@zootek.com.tw;ZOO00963|Emily Yan(EmilyYan);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License",
            [1] "INBOUND;11106;kelly@zootek.com.tw;ZOO02833|Kelly Li(Kelly Li);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License"
        ],
                "dst" => "192.168.50.163",
                 "rt" => "1521006975",
                "cs3" => "0050047006_20180314.xls",
                "cn1" => "SYSTEM,AUDIT"
    },
          "@timestamp" => 2018-03-14T13:56:17.000Z,
        "syslog_level" => "info",
             "version" => "4.1.5",
                "desc" => "Filter by mail processor",
     "syslog_facility" => "local0",
                "host" => "castrovel.local",
         "syslog_host" => "ms1.zootek.com.tw",
                "zero" => "0",
            "@version" => "1",
                  "os" => "CelloOS",
             "message" => "2018-03-14 13:56:17 local0.info ms1.zootek.com.tw CEF:0|Cellopoint|CelloOS|4.1.5|FILTER_LOG|Filter by mail processor|0|gid=O6OWII00060:P5KGHS10XF lid=1979183008 src=192.168.100.71 cs1=0.000 spt=0 dst=192.168.50.163 dpt=0 suser=wip@tera.com.tw duser=INBOUND;11106;emily@zootek.com.tw;ZOO00963|Emily Yan(EmilyYan);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License duser=INBOUND;11106;kelly@zootek.com.tw;ZOO02833|Kelly Li(Kelly Li);zootek::zootek::User::OS::PC;zootek::zootek::User::OS;zootek::zootek::User;zootek::zootek;zootek;License rt=1521006975 fsize=14876 msg=New product for you and 50% discount cs6=????????????????????????????????? cs3=0050047006_20180314.xls cn1=SYSTEM,AUDIT uuid=895d99c727da42f0879482895344fa56-20180314 cs5={\"audit\\/tw.name\":1}\\r\\n"
}
[2018-03-16T17:18:09,040][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x7ac593af run>"}
[success]

Hi yaauie,
Thank for your teach and reply, it's so helpful for me. but I test the config and get error (Unknown setting 'field_split_pattern' for kv). so, i need upgrade logstash to 6.2.2 from 6.1.1?
Because I search the 'field_split_pattern' parameter in Logstash 6.1.1 and 6.2.2 kv plugin difference. just only 6.2.2. version can see it.

Thanks.

https://www.elastic.co/guide/en/logstash/6.1/plugins-filters-kv.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html


[2018-03-17T16:46:19,332][ERROR][logstash.filters.kv ] Unknown setting 'field_split_pattern' for kv
[2018-03-17T16:46:19,340][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Something is wrong with your configuration.", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/config/mixin.rb:89:in config_init'", "/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:128:ininitialize'", "/usr/share/logstash/logstash-core/lib/logstash/filter_delegator.rb:23:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/plugins/plugin_factory.rb:88:inplugin'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:114:in plugin'", "(eval):91:in'", "org/jruby/RubyKernel.java:994:in eval'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:86:ininitialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:171:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:inexecute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:335:in block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:inwith_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:332:in block in converge_state'", "org/jruby/RubyArray.java:1734:ineach'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:319:in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:inblock in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:inconverge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:343:inblock in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}

As I mentioned, the logstash-filter-kv plugin has been updated; from the working with plugins docs

bin/logstash-plugin update logstash-filter-kv

Hi yaauie,
I had upgrade kv plugin to 4.1.0 and test the config. The INBOUND Record is OK, but OUTBOUND Record get _grokparsefailure error. i had compare the INBOUND and OUTBOUND record difference, the sender (suser) and receiver (duser) format are reverse. How to modify field_split_pattern value or add something grok pattern be applicable both?

Thank a lot.

Outbound record

2018-03-14 13:55:44 local0.info ms2.zootek.com.tw CEF:0|Cellopoint|CelloOS|4.1.5|FILTER_LOG|Filter by mail processor|0|gid=O6NHK900040:P5KGGV0PQ4 lid=165515681 src=192.168.100.71 cs1=0.000 spt=0 dst=192.168.50.163 dpt=0 suser=zootek_b2b@zootek.com.tw;zootek_b2b|Zootek_B2B;zootek::Zootek::User::????????????;Zootek::Zootek::User;zootek::Zootek;zootek;License duser=OUTBOUND;11101;David@hospital.com duser=OUTBOUND;11101;Jan@hospital.com rt=1521006942 fsize=2578 msg=[Deal OK]bus the some foods cs6=B2B?????????????????? cs3=ORDER-List_20180314134143.csv cn1=SYSTEM,AUDIT uuid=93dff4ebe28744ca861019262027436b-20180314 cs5={}\r\n

Re-format

2018-03-14 13:55:44 local0.info ms2.zootek.com.tw
CEF:0|Cellopoint|CelloOS|4.1.5|FILTER_LOG|Filter by mail processor|0|
gid=O6NHK900040:P5KGGV0PQ4 lid=165515681 src=192.168.100.71 cs1=0.000 spt=0 dst=192.168.50.163 dpt=0 suser=zootek_b2b@zootek.com.tw;zootek_b2b|Zootek_B2B;zootek::Zootek::User::????????????;Zootek::Zootek::User;zootek::Zootek;zootek;License duser=OUTBOUND;11101;David@hospital.com duser=OUTBOUND;11101;Jan@hospital.com rt=1521006942 fsize=2578 msg=[Deal OK]bus the some foods cs6=B2B?????????????????? cs3=ORDER-List_20180314134143.csv cn1=SYSTEM,AUDIT uuid=93dff4ebe28744ca861019262027436b-20180314 cs5={}\r\n

@Pinno_Lin Have you tried pasting into the Grok Constructor? It's a great tool for building and debugging patterns.

Hi yaauie,
Had fixed the outbound issue. Add new grok pattern. it can parse the outbound record. thanks.

grok {
match => ["[mail][duser]",";%{EMAILADDRESS:[mail][duser_email]};",
"[mail][duser]",";%{EMAILADDRESS:[mail][duser_email]}"]
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.