The perennial auditd event correlation problem


#1

Many, many people over the years have tried to find a sensible way of reducing the several events produced by Linux auditd to a single, meaningful one. audisp, for example, will produce syslog messages of type SYSCALL, CWD, PATH, PATH, PROCTITLE and EOE for a single file open, correlated by a msg=audit(1521726040.236:659) key-value pair in each message. Here's an example.

auditbeat seems to do a really good job of taking these internal events directly from the kernel uni/multicast and producing a single "someone opened this file" event which is transmitted to one of several outputs; Elasticsearch, Logstash, Kafka, Redis, File or Console. I'd love to use it, but alas I have an environment where I have to transmit syslog from all of the devices on a network.

I'm really excited about the whole concept of an Elastic Common Schema, too - I'd really like our cluster to end up storing data in this format if we can (we may have to store data somewhere in its original format, too for evidential purposes...)

My options, I guess, are ::

  1. Fork auditbeat to add a syslog output type
  2. Correlate from syslog in Logstash using the msg kv pair and an in-memory data store
  3. Trap the syslog messages somewhere in between using a custom syslog client and some sort of message bus.

Has anyone else had to jump through these hoops and found a way to provide SOC analysts with easily searchable, accessible data from SYSCALL ?


#2
filter {
  dissect { mapping => { "message" => "type=%{type} msg=audit(%{timestamp}.%{millifraction}:%{something}): %{restofline}" } }
  mutate { add_field => { "auditID" => "%{timestamp}.%{millifraction}:%{something}" } }
  date { match => [ "timestamp", "UNIX" ] target => "audittime" }
  kv { source => "restofline" target => "kvps" }

  if [type] in [ "SYSCALL", "EXECVE", "CWD", "PATH" ] {
    aggregate {
      task_id => "%{auditID}"
      code => "
        t = event.get('type')
        if t == 'SYSCALL'
          map['timestamp'] = event.get('audittime');
          map['SYSCALL'] = event.get('kvps');
          map['PATH'] = Array.new
        end
        if t == 'EXECVE'
          map['EXECVE'] = event.get('kvps')
        end
        if t == 'CWD'
          map['CWD'] = event.get('kvps')
        end
        if t == 'PATH'
          map['PATH'] << event.get('kvps')
        end"
      push_map_as_event_on_timeout => true
      timeout_task_id_field => "auditID"
      timeout => 10 # 10 seconds
      timeout_code => "
        event.set('SYSCALL', map['SYSCALL']);
        event.set('EXECVE', map['EXECVE']);
        event.set('CWD', map['CWD']);
        event.set('PATH', map['PATH'])"
   }
   mutate { remove_field => [ "message", "kvps" ] }
#   drop {}
  }
}

#3

Thanks, Badger,

I'll have a play around with that.


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.