Nested KV Filter in logstash

Hi,

Here is a line from my audit.log file
"type=CRED_DISP msg=audit(1525254633.039:542208): pid=35761 uid=0 auid=4294967295 ses=4294967295 msg='op=PAM:setcred grantors=pam_env,pam_fprintd acct="root" exe="/usr/bin/sudo" hostname=? addr=? terminal=? res=success'

I would like to parse the above line into key value pairs. Problem is the "msg" field which appears twice. I've used the basic KV filter and got the below output.

{
"message" => "type=CRED_DISP msg=audit(1525254633.039:542208): pid=35761 uid=0 auid=4294967295 ses=4294967295 msg='op=PAM:setcred grantors=pam_env,pam_fprintd acct="root" exe="/usr/bin/sudo" hostname=? addr=? terminal=? res=success'",
"@version" => "1",
"host" => "localhost",
"ses" => "4294967295",
"path" => "/var/log/audit/audit.log",
"type" => "CRED_DISP",
"auid" => "4294967295",
"msg" => [
[0] "audit(1525254633.039:542208):",
[1] "op=PAM:setcred grantors=pam_env,pam_fprintd acct="root" exe="/usr/bin/sudo" hostname=? addr=? terminal=? res=success"
],
"pid" => "35761",
"uid" => "0",
"@timestamp" => 2018-05-02T09:51:25.626Z
}

Here is my logstash filter:

input {
file {
path => "/var/log/audit/audit.log"
start_position => "beginning"
}
}
filter {
if [path] == "/var/log/audit/audit.log" {

kv {
  value_split => "="
}

}
}
output {
if [path] == "/var/log/audit/audit.log" {
stdout { codec => rubydebug }
}
}:

I want output something like below:

{
"message" => "type=CRED_DISP msg=audit(1525254633.039:542208): pid=35761 uid=0 auid=4294967295 ses=4294967295 msg='op=PAM:setcred grantors=pam_env,pam_fprintd acct="root" exe="/usr/bin/sudo" hostname=? addr=? terminal=? res=success'",
"@version" => "1",
"host" => "localhost",
"ses" => "4294967295",
"path" => "/var/log/audit/audit.log",
"type" => "CRED_DISP",
"auid" => "4294967295",
"msg" => "audit(1525254633.039:542208):",
"op" => PAM:setcred2
"grantors" => "pam_env,pam_fprintd"
"acct => ""root""
"exe" => ""/usr/bin/sudo""
"hostname" => "?"
"addr" => "?"
"terminal" => "?"
"res" => "success" ],
"pid" => "35761",
"uid" => "0",
"@timestamp" => 2018-05-02T09:51:25.626Z
}

There is a recursive directive on the KV Filter Plugin, but the duplication of msg is going to be problematic because the recursive parse will attempt to create sub-keys of msg, but that will conflict with the string key that doesn't have sub-keys.

If it is only the msg key that is duplicated, and one of those duplicates is always single-quote nested kv, I think we can get what you're after by avoiding the collision before we pass it to the KV Filter Plugin:

filter {
  mutate {
    gsub => ["message", " msg='", " _nested='"]
  }
  kv { source => "message" }
  kv {
    source => "_nested"
    remove_field => "_nested"
  }
}

With the above and Logstash 6.2.2, I get the following; note that the KV filter will capture the contents of quoted values (but not the quotes themselves):

{
           "exe" => "/usr/bin/sudo",
           "pid" => "35761",
      "hostname" => "?",
            "op" => "PAM:setcred",
      "grantors" => "pam_env,pam_fprintd",
    "@timestamp" => 2018-05-03T00:45:05.782Z,
           "uid" => "0",
          "addr" => "?",
           "ses" => "4294967295",
          "acct" => "root",
           "msg" => "audit(1525254633.039:542208):",
      "terminal" => "?",
       "message" => "\"type=CRED_DISP msg=audit(1525254633.039:542208): pid=35761 uid=0 auid=4294967295 ses=4294967295 _nested='op=PAM:setcred grantors=pam_env,pam_fprintd acct=\"root\" exe=\"/usr/bin/sudo\" hostname=? addr=? terminal=? res=success'",
           "res" => "success",
      "@version" => "1",
          "auid" => "4294967295",
          "host" => "castrovel.local",
        "\"type" => "CRED_DISP"
}

Thank you for your help, its working as I expected.

Thanks for sharing, very helpful CMC Vellore

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.