Kv filter plugin

Hello,

I am playing around with logstash filters to parse logs from auditd. Its a challenge to write a filter to catch them all, without going crazy with conditional logics.

What I am trying to accomplish is to send data into kv plugin to manipulate it. Since some logs are hairier than others, I see the need to use 2 kv plugins.

My problem is that I cannot get past one of the "kv"s by adding "ignore_missing => "true", as is done in the kvprocessor in Elasticsearch. Is my only option a conditional if?

It would be easier to understand what you're doing if you'd give an example.

Of course, so, I am trying to parse logs from auditd, and I cant get my filter logics in order. The following two log lines are used:

1. type=MAC_IPSEC_EVENT msg=audit(1518601161.689:23606): op=SAD-delete auid=4294967295 ses=4294967295 subj=system_u:system_r:ipsec_t:s0 src=10.203.133.11 dst=10.203.133.13 spi=3781041342(0xe15e1cbe) res=1
2. type=USER_START msg=audit(1518603422.661:23666): pid=12269 uid=0 auid=1000 ses=123 subj=unconfined_u:unconfined_r:unconfined_t:s0-s0:c0.c1023 msg='op=PAM:session_open grantors=pam_keyinit,pam_limits acct="root" exe="/usr/bin/sudo" hostname=? addr=? terminal=/dev/pts/0 res=success'

I have gotten these patterns from Elasticsearch's processor(my colleague did some editing first time around, and now I am trying my best at it):

grok {
    pattern_definitions => {
        "AUDIT_TYPE" => "^type=%{NOTSPACE:auditd.log.record_type}"
        "AUDIT_PREFIX" => "%{AUDIT_TYPE} msg=audit\(%{NUMBER:auditd.log.epoch}:%{NUMBER:auditd.log.sequence}\):(%{DATA})?"
        "AUDIT_KEY_VALUES" => "%{WORD}=%{GREEDYDATA}"
    }
    match => { "message" => [
        "%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv} old auid=%{NUMBER:auditd.log.old_auid} new auid=%{NUMBER:auditd.log.new_auid} old ses=%{NUMBER:auditd.log.old_ses} new ses=%{NUMBER:auditd.log.new_ses}",
        "%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv} msg=['\"](%{DATA:auditd.log.msg}\s+)?%{AUDIT_KEY_VALUES:auditd.log.sub_kv}['\"]",
        "%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv}",
        "%{AUDIT_PREFIX}",
        "%{AUDIT_TYPE} %{AUDIT_KEY_VALUES:auditd.log.kv}"
    ]}
  }

When log 2 is matched to the second match-pattern, the following filtering works, and the log is parsed, and values in auditd.log.kv and auditd.log.sub_kv are merged:

 kv {
    source => "auditd.log.kv"
    target => "temp_kv"
    field_split => "\s+"
    remove_field => "auditd.log.kv"
  }
  kv {
    source => "auditd.log.sub_kv"
    target => "auditd.log"
    field_split => "\s+"
    remove_field => "auditd.log.sub_kv"
  }
  mutate {
    merge => {"auditd.log" => "temp_kv"}
    remove_field => "temp_kv"
  }

Logstash produces a correct output from log 1 when it gets matched by the third match-pattern, but Logstash logs the following error:

[ERROR][logstash.filters.mutate  ] Not possible to merge an array and a hash:  {:dest_field=>"auditd.log", :added_field=>"temp_kv"}

I suspect that since auditd.log.sub_kv does not exist now, the second kv and mutate creates a hash or something. I am pretty new to this.
I have tried to put the second kv and mutate into an:

if [auditd.log.sub_kv] {

}

But I end up with the auditd.log.sub_kv: in my output, and a multiline error:

[ERROR][logstash.agent           ] Internal API server error ...

The ignore_missing filter-property would be great, or a working if-conditional. I have tried wrapping my auditd.log.sub_kv with every kind of syntax, ", [ and a few combos, but I cannot find a way.

Managed to catch my log 1 without logstash complaining with:

if [temp_kv] {
    mutate {
      merge => {"auditd.log" => "temp_kv"}
      remove_field => "temp_kv"
    }
  }

When pushing a few more logentries through the filters I notice that some fields dont get caught correctly:

Log:

type=LOGIN msg=audit(1518602461.262:23631): pid=12196 uid=0 subj=system_u:system_r:crond_t:s0-s0:c0.c1023 old-auid=4294967295 auid=0 tty=(none) old-ses=4294967295 ses=122 res=1

Pattern from Elasticsearch:

"%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv} old auid=%{NUMBER:auditd.log.old_auid} new auid=%{NUMBER:auditd.log.new_auid} old ses=%{NUMBER:auditd.log.old_ses} new ses=%{NUMBER:auditd.log.new_ses}"

Pattern I had to create to catch the old-auid etc.:

"%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv} old-auid=%{NUMBER:auditd.log.old_auid} auid=%{NUMBER:auditd.log.auid} tty=[(]%{GREEDYDATA:auditd.log.tty}[)] old-ses=%{NUMBER:auditd.log.old_ses} ses=%{NUMBER:auditd.log.ses} res=%{NUMBER:auditd.log.res}"

As a new player in this game, I am not completely sure I HAVE to do this, since im working without a Kibana at the moment, but my colleague tells me I probably have to do this, since the auditd.log.old_auid field might be used in Dashboards.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.