Hi Team,
I'm trying to get NSG Flow logs through Azure Blob Storage (logstash-input-azureblob (0.9.12)).
But the filtering part is not working properly and showing the errors _jsonparsefailure, _split_type_failure, _dateparsefailure .
Types of logs:
{
"records": [{
"time": "2021-03-02T11:00:06.8483928Z",
"systemId": "",
"macAddress": "",
"category": "NetworkSecurityGroupFlowEvent",
"resourceId": "",
"operationName": "NetworkSecurityGroupFlowEvents",
"properties": {
"Version": 2,
"flows": [{
"rule": "DefaultRule_AllowInternetOutBound",
"flows": [{
"mac": "000D3A0A6B1F",
"flowTuples": ["1614682747,192.168.200.4,104.211.232.185,36368,443,T,O,A,E,8,1553,1,74", "1614682748,192.168.200.4,104.211.244.138,39948,9200,T,O,A,E,4,399,1,74", "1614682749,192.168.200.4,104.211.232.185,36396,443,T,O,A,B,,,,", "1614682753,192.168.200.4,104.211.232.185,36378,443,T,O,A,E,2,657,1,74", "1614682755,192.168.200.4,104.211.232.185,36404,443,T,O,A,B,,,,", "1614682759,192.168.200.4,104.211.232.185,36388,443,T,O,A,E,8,1553,1,74", "1614682761,192.168.200.4,104.211.232.185,36412,443,T,O,A,B,,,,", "1614682765,192.168.200.4,104.211.244.138,39992,9200,T,O,A,B,,,,", "1614682765,192.168.200.4,104.211.232.185,36396,443,T,O,A,E,8,1553,1,74", "1614682767,192.168.200.4,104.211.232.185,36422,443,T,O,A,B,,,,", "1614682768,192.168.200.4,20.43.120.7,46234,443,T,O,A,B,,,,", "1614682769,192.168.200.4,20.43.120.7,46236,443,T,O,A,B,,,,", "1614682771,192.168.200.4,20.43.120.7,46238,443,T,O,A,B,,,,", "1614682771,192.168.200.4,104.211.232.185,36404,443,T,O,A,E,8,1553,1,74", "1614682773,192.168.200.4,104.211.232.185,36446,443,T,O,A,B,,,,", "1614682777,192.168.200.4,104.211.232.185,36412,443,T,O,A,E,8,1553,1,74", "1614682779,192.168.200.4,104.211.232.185,36456,443,T,O,A,B,,,,", "1614682782,192.168.200.4,104.211.244.138,39992,9200,T,O,A,E,4,399,1,74", "1614682783,192.168.200.4,104.211.232.185,36422,443,T,O,A,E,8,1553,1,74", "1614682785,192.168.200.4,104.211.232.185,36464,443,T,O,A,B,,,,", "1614682785,192.168.200.4,20.43.120.7,46234,443,T,O,A,E,1,74,1,66", "1614682786,192.168.200.4,20.43.120.7,46236,443,T,O,A,E,1,74,1,66", "1614682788,192.168.200.4,20.43.120.7,46238,443,T,O,A,E,1,74,1,66", "1614682790,192.168.200.4,104.211.232.185,36446,443,T,O,A,E,8,1553,1,74", "1614682791,192.168.200.4,104.211.232.185,36472,443,T,O,A,B,,,,", "1614682796,192.168.200.4,104.211.232.185,36456,443,T,O,A,E,8,1553,1,74", "1614682797,192.168.200.4,104.211.232.185,36480,443,T,O,A,B,,,,", "1614682802,192.168.200.4,104.211.232.185,36464,443,T,O,A,E,8,1553,1,74"]
}]
}, {
"rule": "DefaultRule_DenyAllInBound",
"flows": [{
"mac": "",
"flowTuples": ["1614682743,149.81.167.130,192.168.200.4,49192,56596,T,I,D,B,,,,", "1614682770,162.142.125.159,192.168.200.4,51377,6021,T,I,D,B,,,,", "1614682773,167.248.133.30,192.168.200.4,62339,8010,T,I,D,B,,,,", "1614682775,162.142.125.146,192.168.200.4,2654,9015,T,I,D,B,,,,", "1614682779,162.142.125.156,192.168.200.4,22192,9950,T,I,D,B,,,,", "1614682779,162.142.125.147,192.168.200.4,21145,52869,T,I,D,B,,,,", "1614682782,103.127.186.26,192.168.200.4,53363,445,T,I,D,B,,,,"]
}]
}, {
"rule": "UserRule_Port_9200",
"flows": [{
"mac": "",
"flowTuples": ["1614682748,104.211.244.138,192.168.200.4,39948,9200,T,I,A,E,4,399,1,74", "1614682765,104.211.244.138,192.168.200.4,39992,9200,T,I,A,B,,,,", "1614682782,104.211.244.138,192.168.200.4,39992,9200,T,I,A,E,4,399,1,74", "1614682798,192.168.250.4,192.168.200.4,13963,9200,T,I,A,E,11,2337,11,3987"]
}]
}, {
"rule": "UserRule_SSH",
"flows": [{
"mac": "",
"flowTuples": ["1614682757,218.92.0.192,192.168.200.4,54565,22,T,I,A,B,,,,", "1614682782,218.92.0.192,192.168.200.4,54565,22,T,I,A,E,11,2037,23,5180", "1614682788,218.92.0.192,192.168.200.4,41062,22,T,I,A,B,,,,"]
}]
}]
}
},
The filters which are used:
filter {
split {
field => "[records]"
}
split {
field => "[records][properties][flows]"
}
split {
field => "[records][properties][flows][flows]"
}
split {
field => "[records][properties][flows][flows][flowTuples]"
}
mutate {
split => {
"[records][resourceId]" => "/"
}
add_field => {
"Subscription" => "%{[records][resourceId][2]}"
"ResourceGroup" => "%{[records][resourceId][4]}"
"NetworkSecurityGroup" => "%{[records][resourceId][8]}"
}
convert => {
"Subscription" => "string"
}
convert => {
"ResourceGroup" => "string"
}
convert => {
"NetworkSecurityGroup" => "string"
}
split => {
"[records][properties][flows][flows][flowTuples]" => ","
}
add_field => {
"unixtimestamp" => "%{[records][properties][flows][flows][flowTuples][0]}"
"srcIp" => "%{[records][properties][flows][flows][flowTuples][1]}"
"destIp" => "%{[records][properties][flows][flows][flowTuples][2]}"
"srcPort" => "%{[records][properties][flows][flows][flowTuples][3]}"
"destPort" => "%{[records][properties][flows][flows][flowTuples][4]}"
"protocol" => "%{[records][properties][flows][flows][flowTuples][5]}"
"trafficflow" => "%{[records][properties][flows][flows][flowTuples][6]}"
"traffic" => "%{[records][properties][flows][flows][flowTuples][7]}"
}
convert => {
"unixtimestamp" => "integer"
}
convert => {
"srcPort" => "integer"
}
convert => {
"destPort" => "integer"
}
}
date {
match => ["unixtimestamp", "UNIX"]
}}
I want to split the 'value' field that is a JSON array into multiple JSONs. Please help me to filter this properly.
Please guide. Thank you in advance.