Logstash - Aggregate Filter

I'm working with auditd and I'm trying to take multiple events, with the same auditID, and convert it (insert a new document) into one document.

The end goal would be to have the original documents be submitted, but then have a final document which contains all of the unique fields.

I'm operating on the document a few ways (e.g., I'm shipping it via json format with rsyslog, using json and kv filters), but I'm having an issue working with nested values.

Here is a sample of some documents:

1st Log:

{
  "_index": "test-2017.11.16",
  "_type": "log",
  "_id": "aIeixl8BqbGtc_MpEfsx",
  "_version": 1,
  "_score": null,
  "_source": {
    "offset": 80594529,
    "input_type": "log",
    "source": "/data/var/log/remotehosts/ubuntu.2017-11-16.log",
    "event_data": {
      "syscall": "42",
      "gid": "0",
      "fsgid": "0",
      "programname": "audispd",
      "pid": "12109",
      "suid": "0",
      "type": "SYSCALL",
      "uid": "0",
      "egid": "0",
      "exe": "/usr/bin/wget",
      "audit": "1510866028.145:162548",
      "@version": "1",
      "fromhost-ip": "127.0.0.1",
      "sgid": "0",
      "sysloghost": "ubuntu",
      "inputname": "imuxsock",
      "key": "network_outbound6",
      "severity": "info",
      "ses": "4294967295",
      "auid": "4294967295",
      "comm": "wget",
      "euid": "0",
      "procid": "-",
      "message": " node=ubuntu type=SYSCALL audit=1510866028.145:162548 arch=c000003e syscall=42 success=yes exit=0 a0=4 a1=7fffffffda10 a2=10 a3=0 items=0 ppid=5201 pid=12109 auid=4294967295 uid=0 gid=0 euid=0 suid=0 fsuid=0 egid=0 sgid=0 fsgid=0 tty=pts19 ses=4294967295 comm=\"wget\" exe=\"/usr/bin/wget\" key=\"network_outbound6\"",
      "a0": "4",
      "ppid": "5201",
      "a1": "7fffffffda10",
      "fsuid": "0",
      "node": "ubuntu",
      "exit": "0",
      "a2": "10",
      "a3": "0",
      "@timestamp": "2017-11-16T16:00:28.194605-05:00",
      "success": "yes",
      "tty": "pts19",
      "arch": "c000003e",
      "facility": "user",
      "items": "0"
    },
    "message": "{\"@timestamp\":\"2017-11-16T16:00:28.194605-05:00\",\"@version\":\"1\",\"message\":\" node=ubuntu type=SYSCALL msg=audit(1510866028.145:162548): arch=c000003e syscall=42 success=yes exit=0 a0=4 a1=7fffffffda10 a2=10 a3=0 items=0 ppid=5201 pid=12109 auid=4294967295 uid=0 gid=0 euid=0 suid=0 fsuid=0 egid=0 sgid=0 fsgid=0 tty=pts19 ses=4294967295 comm=\\\"wget\\\" exe=\\\"\\/usr\\/bin\\/wget\\\" key=\\\"network_outbound6\\\"\",\"sysloghost\":\"ubuntu\",\"severity\":\"info\",\"facility\":\"user\",\"programname\":\"audispd\",\"procid\":\"-\",\"inputname\":\"imuxsock\",\"fromhost-ip\":\"127.0.0.1\"}",
    "type": "log",
    "tags": [
      "beats_input_codec_plain_applied"
    ],
    "insertTime": "2017-11-16T21:00:30.000Z",
    "@timestamp": "2017-11-16T21:00:28.832Z",
    "@version": "1",
    "beat": {
      "name": "ubuntu",
      "hostname": "ubuntu",
      "version": "5.6.3"
    },
    "host": "ubuntu"
  },
  "fields": {
    "insertTime": [
      "2017-11-16T21:00:30.000Z"
    ],
    "@timestamp": [
      "2017-11-16T21:00:28.832Z"
    ],
    "event_data.@timestamp": [
      "2017-11-16T21:00:28.194Z"
    ]
  },
  "highlight": {
    "event_data.audit": [
      "@kibana-highlighted-field@1510866028.145@/kibana-highlighted-field@:@kibana-highlighted-field@162548@/kibana-highlighted-field@"
    ]
  },
  "sort": [
    1510866030000
  ]
}

2nd Log:

{
  "_index": "test0-2017.11.16",
  "_type": "log",
  "_id": "aYeixl8BqbGtc_MpEfsx",
  "_version": 1,
  "_score": null,
  "_source": {
    "offset": 80594850,
    "input_type": "log",
    "source": "/data/var/log/remotehosts/ubuntu.2017-11-16.log",
    "event_data": {
      "severity": "info",
      "saddr": "02000050D02B66FA0000000000000000",
      "programname": "audispd",
      "procid": "-",
      "message": " node=ubuntu type=SOCKADDR audit=1510866028.145:162548 saddr=02000050D02B66FA0000000000000000",
      "type": "SOCKADDR",
      "node": "ubuntu",
      "@timestamp": "2017-11-16T16:00:28.194758-05:00",
      "audit": "1510866028.145:162548",
      "@version": "1",
      "fromhost-ip": "127.0.0.1",
      "sysloghost": "ubuntu",
      "inputname": "imuxsock",
      "facility": "user"
    },
    "message": "{\"@timestamp\":\"2017-11-16T16:00:28.194758-05:00\",\"@version\":\"1\",\"message\":\" node=ubuntu type=SOCKADDR msg=audit(1510866028.145:162548): saddr=02000050D02B66FA0000000000000000\",\"sysloghost\":\"ubuntu\",\"severity\":\"info\",\"facility\":\"user\",\"programname\":\"audispd\",\"procid\":\"-\",\"inputname\":\"imuxsock\",\"fromhost-ip\":\"127.0.0.1\"}",
    "type": "log",
    "tags": [
      "beats_input_codec_plain_applied"
    ],
    "insertTime": "2017-11-16T21:00:30.000Z",
    "@timestamp": "2017-11-16T21:00:28.832Z",
    "@version": "1",
    "beat": {
      "name": "ubuntu",
      "hostname": "ubuntu",
      "version": "5.6.3"
    },
    "host": "ubuntu"
  },
  "fields": {
    "insertTime": [
      "2017-11-16T21:00:30.000Z"
    ],
    "@timestamp": [
      "2017-11-16T21:00:28.832Z"
    ],
    "event_data.@timestamp": [
      "2017-11-16T21:00:28.194Z"
    ]
  },
  "highlight": {
    "event_data.audit": [
      "@kibana-highlighted-field@1510866028.145@/kibana-highlighted-field@:@kibana-highlighted-field@162548@/kibana-highlighted-field@"
    ]
  },
  "sort": [
    1510866030000
  ]
}

What I've been able to do, is try to use the aggregate filter in order to pivot off of the auditid (event_data.audit) which I can identify multiple events associated with these log entries. Where I start having issues are around working with the nested "event_id" values, in order to try and insert/merge these documents together into a new document.

Here is what my current aggregate filter looks like:

aggregate {
                task_id => "%{[event_data][audit]}"
                code => "
                        map.merge!(event.get('[event_data]'))
                        event.to_hash.each do |key,value|
                                map[key] = value
                        end
                        "
                timeout_tags => ["custom_timeout_tag"]
                push_map_as_event_on_timeout => true
                timeout => 5
                }
            }

What this does, is create a similar document, however all the event_data now becomes top level fields, instead of being nested under event_data. I'm okay if it clobbers/overwrites duplicate data, just as long as the unique fields are sustained (e.g., in this example event_data.saddr and event_data.exe).

Is there a way to correctly call/insert into a nested field with the aggregate filter?

Also, another issue I found is that aggregate will not create an event with this simple filter:

aggregate {
                task_id => "%{[event_data][audit]}"
                code => "
                        map['[event_data]'] = event.get('[event_data]')
                        "
                timeout_tags => ["custom_timeout_tag"]
                push_map_as_event_on_timeout => true
                timeout => 5
                }
            }

Seems that a new event will only be created if I loop through the event this way:

aggregate {
                task_id => "%{[event_data][audit]}"
                code => "
                        event.to_hash.each do |key,value|
                                map[key] = value
                        end
                        "
                timeout_tags => ["custom_timeout_tag"]
                push_map_as_event_on_timeout => true
                timeout => 5
                }
            }

Which doesn't help me either way. I just noticed that I have to write out to the map variable in an odd way for it to create it's own separate event.

Welp, after a week long fight, here is the solution:

code => "
            event.to_hash.each do |key,value|
                 if value.is_a?(Hash)
                        map[key] ||= {}
                        map[key].merge!(value)
                  else
                    map[key] = value
                  end
            end
            "

That will extract event_data nested values, and insert them correctly into the map.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.