Hi,
I'm pretty new to ELK world, and now I'm trying to parse O365 audit log. (I used to used excel, than I heard about the ELK stack)
So here is some test data:
CreationDate,UserIds,Operations,AuditData
3462-12-12T15:13:18.0000000Z,username@foodomain.com,UserLoginFailed,"{""CreationTime"":""3462-12-12T15:13:18"",""Id"":""c15ac09c-4b6d-46ea-b400-d367bf415109"",""Operation"":""UserLoginFailed"",""OrganizationId"":""e6de5d22-352f-488c-a4fd-64ecb2a50f19"",""RecordType"":15,""ResultStatus"":""Failed"",""UserKey"":""10033FFF9DCD26FC@foodomain.com"",""UserType"":0,""Version"":1,""Workload"":""AzureActiveDirectory"",""ClientIP"":""431.63.751.958"",""ObjectId"":""Unknown"",""UserId"":""username@foodomain.com"",""AzureActiveDirectoryEventType"":1,""ExtendedProperties"":[{""Name"":""UserAgent"",""Value"":""Android\/5.1.1-EAS-2.0""},{""Name"":""RequestType"",""Value"":""OrgIdWsTrust2:process""},{""Name"":""ResultStatusDetail"",""Value"":""UserError""}],""Actor"":[{""ID"":""ea4eef3a-6796-46a0-72cecc133a2e"",""Type"":0},{""ID"":""username@foodomain.com"",""Type"":5},{""ID"":""10033FFF9DCD26FC"",""Type"":3}],""ActorContextId"":""e6de5d22-488c-a4fd-64ecb2a50f19"",""ActorIpAddress"":""431.63.751.958"",""InterSystemsId"":""dedbf105-4406-a462-b3fd9f1accef"",""IntraSystemId"":""412785f4-8112-83c1-4ddd23022600"",""Target"":[{""ID"":""Unknown"",""Type"":0}],""TargetContextId"":""e6de5d22-488c-a4fd-64ecb2a50f19"",""ApplicationId"":""bfc44fc5-2fe3-98ec-1e5967475f68"",""LogonError"":""InvalidUserNameOrPassword""}"
This is basically a CSV file with 4 fileds:
CreationDate,UserIds,Operations,AuditData, however the AuditData contains a whole new data structure in a key:value format.
Reading the "getting started with XY" articles helped me to setup the elasticsearch, kibana, logstash, filebeat, and I also know about the filters.
I used the csv filter so now it is properly parsed as a CSV, however on the last column I need an additional filter (kv filter seems good for the job), but not sure how should I construct my pipeline file to make happen. It looks like this:
input {
beats {
port => "5044"
}
}
filter {
csv {
autodetect_column_names => true
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
To make it more simple, some key-value pair has sub key-value data, and not all log line contain the same amount of properties etc.
This logs can be very comfortably searched and used via the exchange online portal, however I usually have to deal with them offline.
So the question is how can I apply additional filter on an already filtered data?