Logstash Filter

Hi Team,
I'm trying to get NSG Flow logs through Azure Blob Storage (logstash-input-azureblob (0.9.12)).

But the filtering part is not working properly and showing the errors _jsonparsefailure, _split_type_failure, _dateparsefailure .

Types of logs:
{
"records": [{
"time": "2021-03-02T11:00:06.8483928Z",
"systemId": "",
"macAddress": "",
"category": "NetworkSecurityGroupFlowEvent",
"resourceId": "",
"operationName": "NetworkSecurityGroupFlowEvents",
"properties": {
"Version": 2,
"flows": [{
"rule": "DefaultRule_AllowInternetOutBound",
"flows": [{
"mac": "000D3A0A6B1F",
"flowTuples": ["1614682747,192.168.200.4,104.211.232.185,36368,443,T,O,A,E,8,1553,1,74", "1614682748,192.168.200.4,104.211.244.138,39948,9200,T,O,A,E,4,399,1,74", "1614682749,192.168.200.4,104.211.232.185,36396,443,T,O,A,B,,,,", "1614682753,192.168.200.4,104.211.232.185,36378,443,T,O,A,E,2,657,1,74", "1614682755,192.168.200.4,104.211.232.185,36404,443,T,O,A,B,,,,", "1614682759,192.168.200.4,104.211.232.185,36388,443,T,O,A,E,8,1553,1,74", "1614682761,192.168.200.4,104.211.232.185,36412,443,T,O,A,B,,,,", "1614682765,192.168.200.4,104.211.244.138,39992,9200,T,O,A,B,,,,", "1614682765,192.168.200.4,104.211.232.185,36396,443,T,O,A,E,8,1553,1,74", "1614682767,192.168.200.4,104.211.232.185,36422,443,T,O,A,B,,,,", "1614682768,192.168.200.4,20.43.120.7,46234,443,T,O,A,B,,,,", "1614682769,192.168.200.4,20.43.120.7,46236,443,T,O,A,B,,,,", "1614682771,192.168.200.4,20.43.120.7,46238,443,T,O,A,B,,,,", "1614682771,192.168.200.4,104.211.232.185,36404,443,T,O,A,E,8,1553,1,74", "1614682773,192.168.200.4,104.211.232.185,36446,443,T,O,A,B,,,,", "1614682777,192.168.200.4,104.211.232.185,36412,443,T,O,A,E,8,1553,1,74", "1614682779,192.168.200.4,104.211.232.185,36456,443,T,O,A,B,,,,", "1614682782,192.168.200.4,104.211.244.138,39992,9200,T,O,A,E,4,399,1,74", "1614682783,192.168.200.4,104.211.232.185,36422,443,T,O,A,E,8,1553,1,74", "1614682785,192.168.200.4,104.211.232.185,36464,443,T,O,A,B,,,,", "1614682785,192.168.200.4,20.43.120.7,46234,443,T,O,A,E,1,74,1,66", "1614682786,192.168.200.4,20.43.120.7,46236,443,T,O,A,E,1,74,1,66", "1614682788,192.168.200.4,20.43.120.7,46238,443,T,O,A,E,1,74,1,66", "1614682790,192.168.200.4,104.211.232.185,36446,443,T,O,A,E,8,1553,1,74", "1614682791,192.168.200.4,104.211.232.185,36472,443,T,O,A,B,,,,", "1614682796,192.168.200.4,104.211.232.185,36456,443,T,O,A,E,8,1553,1,74", "1614682797,192.168.200.4,104.211.232.185,36480,443,T,O,A,B,,,,", "1614682802,192.168.200.4,104.211.232.185,36464,443,T,O,A,E,8,1553,1,74"]
}]
}, {
"rule": "DefaultRule_DenyAllInBound",
"flows": [{
"mac": "",
"flowTuples": ["1614682743,149.81.167.130,192.168.200.4,49192,56596,T,I,D,B,,,,", "1614682770,162.142.125.159,192.168.200.4,51377,6021,T,I,D,B,,,,", "1614682773,167.248.133.30,192.168.200.4,62339,8010,T,I,D,B,,,,", "1614682775,162.142.125.146,192.168.200.4,2654,9015,T,I,D,B,,,,", "1614682779,162.142.125.156,192.168.200.4,22192,9950,T,I,D,B,,,,", "1614682779,162.142.125.147,192.168.200.4,21145,52869,T,I,D,B,,,,", "1614682782,103.127.186.26,192.168.200.4,53363,445,T,I,D,B,,,,"]
}]
}, {
"rule": "UserRule_Port_9200",
"flows": [{
"mac": "",
"flowTuples": ["1614682748,104.211.244.138,192.168.200.4,39948,9200,T,I,A,E,4,399,1,74", "1614682765,104.211.244.138,192.168.200.4,39992,9200,T,I,A,B,,,,", "1614682782,104.211.244.138,192.168.200.4,39992,9200,T,I,A,E,4,399,1,74", "1614682798,192.168.250.4,192.168.200.4,13963,9200,T,I,A,E,11,2337,11,3987"]
}]
}, {
"rule": "UserRule_SSH",
"flows": [{
"mac": "",
"flowTuples": ["1614682757,218.92.0.192,192.168.200.4,54565,22,T,I,A,B,,,,", "1614682782,218.92.0.192,192.168.200.4,54565,22,T,I,A,E,11,2037,23,5180", "1614682788,218.92.0.192,192.168.200.4,41062,22,T,I,A,B,,,,"]
}]
}]
}
},

The filters which are used:
filter {
split {
field => "[records]"
}
split {
field => "[records][properties][flows]"
}
split {
field => "[records][properties][flows][flows]"
}
split {
field => "[records][properties][flows][flows][flowTuples]"
}
mutate {
split => {
"[records][resourceId]" => "/"
}
add_field => {
"Subscription" => "%{[records][resourceId][2]}"
"ResourceGroup" => "%{[records][resourceId][4]}"
"NetworkSecurityGroup" => "%{[records][resourceId][8]}"
}
convert => {
"Subscription" => "string"
}
convert => {
"ResourceGroup" => "string"
}
convert => {
"NetworkSecurityGroup" => "string"
}
split => {
"[records][properties][flows][flows][flowTuples]" => ","
}
add_field => {
"unixtimestamp" => "%{[records][properties][flows][flows][flowTuples][0]}"
"srcIp" => "%{[records][properties][flows][flows][flowTuples][1]}"
"destIp" => "%{[records][properties][flows][flows][flowTuples][2]}"
"srcPort" => "%{[records][properties][flows][flows][flowTuples][3]}"
"destPort" => "%{[records][properties][flows][flows][flowTuples][4]}"
"protocol" => "%{[records][properties][flows][flows][flowTuples][5]}"
"trafficflow" => "%{[records][properties][flows][flows][flowTuples][6]}"
"traffic" => "%{[records][properties][flows][flows][flowTuples][7]}"
}
convert => {
"unixtimestamp" => "integer"
}
convert => {
"srcPort" => "integer"
}
convert => {
"destPort" => "integer"
}
}
date {
match => ["unixtimestamp", "UNIX"]
}}

I want to split the 'value' field that is a JSON array into multiple JSONs. Please help me to filter this properly.

Please guide. Thank you in advance.

This part is fine

    split { field => "[records]" }
    split { field => "[records][properties][flows]" }
    split { field => "[records][properties][flows][flows]" }
    split { field => "[records][properties][flows][flows][flowTuples]" }

That gets you records like

{
       "records" => {
           "resourceId" => "",
           "properties" => {
            "Version" => 2,
              "flows" => {
                 "rule" => "DefaultRule_AllowInternetOutBound",
                "flows" => {
                           "mac" => "000D3A0A6B1F",
                    "flowTuples" => "1614682747,192.168.200.4,104.211.232.185,36368,443,T,O,A,E,8,1553,1,74"
                }
            }
        },
             "category" => "NetworkSecurityGroupFlowEvent",
        "operationName" => "NetworkSecurityGroupFlowEvents",
           "macAddress" => "",
                 "time" => "2021-03-02T11:00:06.8483928Z",
             "systemId" => ""
    }
    ...
}

The problem is in your mutate filter. Filter operations are processed in a fixed order. coerce first, then rename etc. You can check the code to see the order of the common options. You will need to use multiple mutate options to force the order you want.

Thanks a lot :innocent:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.