I'm working with auditd and I'm trying to take multiple events, with the same auditID, and convert it (insert a new document) into one document.
The end goal would be to have the original documents be submitted, but then have a final document which contains all of the unique fields.
I'm operating on the document a few ways (e.g., I'm shipping it via json format with rsyslog, using json and kv filters), but I'm having an issue working with nested values.
Here is a sample of some documents:
1st Log:
{
"_index": "test-2017.11.16",
"_type": "log",
"_id": "aIeixl8BqbGtc_MpEfsx",
"_version": 1,
"_score": null,
"_source": {
"offset": 80594529,
"input_type": "log",
"source": "/data/var/log/remotehosts/ubuntu.2017-11-16.log",
"event_data": {
"syscall": "42",
"gid": "0",
"fsgid": "0",
"programname": "audispd",
"pid": "12109",
"suid": "0",
"type": "SYSCALL",
"uid": "0",
"egid": "0",
"exe": "/usr/bin/wget",
"audit": "1510866028.145:162548",
"@version": "1",
"fromhost-ip": "127.0.0.1",
"sgid": "0",
"sysloghost": "ubuntu",
"inputname": "imuxsock",
"key": "network_outbound6",
"severity": "info",
"ses": "4294967295",
"auid": "4294967295",
"comm": "wget",
"euid": "0",
"procid": "-",
"message": " node=ubuntu type=SYSCALL audit=1510866028.145:162548 arch=c000003e syscall=42 success=yes exit=0 a0=4 a1=7fffffffda10 a2=10 a3=0 items=0 ppid=5201 pid=12109 auid=4294967295 uid=0 gid=0 euid=0 suid=0 fsuid=0 egid=0 sgid=0 fsgid=0 tty=pts19 ses=4294967295 comm=\"wget\" exe=\"/usr/bin/wget\" key=\"network_outbound6\"",
"a0": "4",
"ppid": "5201",
"a1": "7fffffffda10",
"fsuid": "0",
"node": "ubuntu",
"exit": "0",
"a2": "10",
"a3": "0",
"@timestamp": "2017-11-16T16:00:28.194605-05:00",
"success": "yes",
"tty": "pts19",
"arch": "c000003e",
"facility": "user",
"items": "0"
},
"message": "{\"@timestamp\":\"2017-11-16T16:00:28.194605-05:00\",\"@version\":\"1\",\"message\":\" node=ubuntu type=SYSCALL msg=audit(1510866028.145:162548): arch=c000003e syscall=42 success=yes exit=0 a0=4 a1=7fffffffda10 a2=10 a3=0 items=0 ppid=5201 pid=12109 auid=4294967295 uid=0 gid=0 euid=0 suid=0 fsuid=0 egid=0 sgid=0 fsgid=0 tty=pts19 ses=4294967295 comm=\\\"wget\\\" exe=\\\"\\/usr\\/bin\\/wget\\\" key=\\\"network_outbound6\\\"\",\"sysloghost\":\"ubuntu\",\"severity\":\"info\",\"facility\":\"user\",\"programname\":\"audispd\",\"procid\":\"-\",\"inputname\":\"imuxsock\",\"fromhost-ip\":\"127.0.0.1\"}",
"type": "log",
"tags": [
"beats_input_codec_plain_applied"
],
"insertTime": "2017-11-16T21:00:30.000Z",
"@timestamp": "2017-11-16T21:00:28.832Z",
"@version": "1",
"beat": {
"name": "ubuntu",
"hostname": "ubuntu",
"version": "5.6.3"
},
"host": "ubuntu"
},
"fields": {
"insertTime": [
"2017-11-16T21:00:30.000Z"
],
"@timestamp": [
"2017-11-16T21:00:28.832Z"
],
"event_data.@timestamp": [
"2017-11-16T21:00:28.194Z"
]
},
"highlight": {
"event_data.audit": [
"@kibana-highlighted-field@1510866028.145@/kibana-highlighted-field@:@kibana-highlighted-field@162548@/kibana-highlighted-field@"
]
},
"sort": [
1510866030000
]
}
2nd Log:
{
"_index": "test0-2017.11.16",
"_type": "log",
"_id": "aYeixl8BqbGtc_MpEfsx",
"_version": 1,
"_score": null,
"_source": {
"offset": 80594850,
"input_type": "log",
"source": "/data/var/log/remotehosts/ubuntu.2017-11-16.log",
"event_data": {
"severity": "info",
"saddr": "02000050D02B66FA0000000000000000",
"programname": "audispd",
"procid": "-",
"message": " node=ubuntu type=SOCKADDR audit=1510866028.145:162548 saddr=02000050D02B66FA0000000000000000",
"type": "SOCKADDR",
"node": "ubuntu",
"@timestamp": "2017-11-16T16:00:28.194758-05:00",
"audit": "1510866028.145:162548",
"@version": "1",
"fromhost-ip": "127.0.0.1",
"sysloghost": "ubuntu",
"inputname": "imuxsock",
"facility": "user"
},
"message": "{\"@timestamp\":\"2017-11-16T16:00:28.194758-05:00\",\"@version\":\"1\",\"message\":\" node=ubuntu type=SOCKADDR msg=audit(1510866028.145:162548): saddr=02000050D02B66FA0000000000000000\",\"sysloghost\":\"ubuntu\",\"severity\":\"info\",\"facility\":\"user\",\"programname\":\"audispd\",\"procid\":\"-\",\"inputname\":\"imuxsock\",\"fromhost-ip\":\"127.0.0.1\"}",
"type": "log",
"tags": [
"beats_input_codec_plain_applied"
],
"insertTime": "2017-11-16T21:00:30.000Z",
"@timestamp": "2017-11-16T21:00:28.832Z",
"@version": "1",
"beat": {
"name": "ubuntu",
"hostname": "ubuntu",
"version": "5.6.3"
},
"host": "ubuntu"
},
"fields": {
"insertTime": [
"2017-11-16T21:00:30.000Z"
],
"@timestamp": [
"2017-11-16T21:00:28.832Z"
],
"event_data.@timestamp": [
"2017-11-16T21:00:28.194Z"
]
},
"highlight": {
"event_data.audit": [
"@kibana-highlighted-field@1510866028.145@/kibana-highlighted-field@:@kibana-highlighted-field@162548@/kibana-highlighted-field@"
]
},
"sort": [
1510866030000
]
}
What I've been able to do, is try to use the aggregate filter in order to pivot off of the auditid (event_data.audit) which I can identify multiple events associated with these log entries. Where I start having issues are around working with the nested "event_id" values, in order to try and insert/merge these documents together into a new document.
Here is what my current aggregate filter looks like:
aggregate {
task_id => "%{[event_data][audit]}"
code => "
map.merge!(event.get('[event_data]'))
event.to_hash.each do |key,value|
map[key] = value
end
"
timeout_tags => ["custom_timeout_tag"]
push_map_as_event_on_timeout => true
timeout => 5
}
}
What this does, is create a similar document, however all the event_data now becomes top level fields, instead of being nested under event_data. I'm okay if it clobbers/overwrites duplicate data, just as long as the unique fields are sustained (e.g., in this example event_data.saddr and event_data.exe).
Is there a way to correctly call/insert into a nested field with the aggregate filter?