Splitting Values in Data

I have been searching the internet for a few hours now trying to solve this question. I am currently parsing out data from Cuckoo Sandbox (automated malware analysis) which comes as a key-value pair. The issue I am running into is some of the values are semicolon seperated and I would like to add them to my db as an array.

I have attempted using the KV filter to split but I am either not using it as intended or am going the wrong direction.

The message comes across as:

Timestamp="2015/08/01 13:52:56" id="24814" Submission="file" MD5="d18d493b20d68a37cc5bbf0dbeb72f46" SHA1="bf5ac56e8b9884c825a95499ad9f2a63f733054e" File_Name="d18d493b20d68a37cc5bbf0dbeb72f46" File_Size="45782" File_Type="HTML document, UTF-8 Unicode text, with very long lines, with CRLF, LF line terminators" MalScore="0.5" Related_IPs="-" Related_Domains="static.4shared.com;c.statcounter.com;secure.quantserve.com;www.statcounter.com" Total_TCP="0" Total_UDP="68"Virustotal="Not Found" Cuckoo_Sigs="injection_rwx" Yara="-" 

I am currently parsing the data to the following format... I am very new to Grok filtering, so if you have any recommendations outside of my request, please feel free to provide input:

"year": "2015",
"month": "08",
"day": "01",
"time": "13:52:56",
"id": "24814",
"submission": "file",
"md5": "d18d493b20d68a37cc5bbf0dbeb72f46",
"sha1": "bf5ac56e8b9884c825a95499ad9f2a63f733054e",
"filename": "d18d493b20d68a37cc5bbf0dbeb72f46",
"filesize": "45782",
"filetype": "HTML document, UTF-8 Unicode text, with very long lines, with CRLF, LF line terminators",
"malscore": "0.5",
"relatedips": "-",
"relateddomains": "static.4shared.com;c.statcounter.com;secure.quantserve.com;www.statcounter.com",
"totaltcp": "0",
"totaludp": "68",
"virustotal": "Not Found",
"cuckoosigs": "injection_rwx",
"yara": "-""

I am attempting to split 3 fields... relatedips, relateddomains on ";" and then filetype on ",".
"relateddomains": "static.4shared.com;c.statcounter.com;secure.quantserve.com;www.statcounter.com"

Any help would truly be appreciated.

Jim

Try using a conditional, that if those fields exist then run another KV filter using ; as a separator?

Thank you for the recommendation. I have tried a few bits of code to setup these conditionals but keep running into an error on restart.

Do you mind providing an example?

What's the error you are getting and what does your config look like?

You could also use grok to break it up, then a KV filter on the relateddomains field.
But if you can post what you have it'll make it easier to adapt :slight_smile:

Here is what the working config looks like:

filter {
        if "cuckoo" in [tags] {
                grok {
                        patterns_dir => "/opt/logstash/patterns"
                        match => [ 'message', 'Timestamp="%{PF_DATE_TIME:starttime}" id="%{NUMBER:id}" Submission="%{WORD:submission}" MD5="%{WORD:md5}" SHA1="%{WORD:sha1}" File_Name="%{WORD:filename}" File_Size="%{NUMBER:filesize}" File_Type="%{GREEDYDATA:filetype}" MalScore="%{NUMBER:malscore}" Related_IPs="%{GREEDYDATA:relatedips}" Related_Domains="%{GREEDYDATA:relateddomains}" Total_TCP="%{NUMBER:totaltcp}" Total_UDP="%{NUMBER:totaludp}"Virustotal="%{GREEDYDATA:virustotal}" Cuckoo_Sigs="%{GREEDYDATA:cuckoosigs}" Yara="%{GREEDYDATA:yara}" ' ]
                        add_tag => ["nomalfamily"]
                        remove_tag => ["_grokparsefailure_sysloginput"]
                        named_captures_only => true
                }

        }
}
filter {
        if "cuckoo" in [tags] {
                if "_grokparsefailure" in [tags] {
                        grok {
                                patterns_dir => "/opt/logstash/patterns"
                                match => [ 'message', 'Timestamp="%{PF_DATE_TIME:starttime}" id="%{NUMBER:id}" Submission="%{WORD:submission}" MD5="%{WORD:md5}" SHA1="%{WORD:sha1}" File_Name="%{WORD:filename}" File_Size="%{NUMBER:filesize="%{NUMBER:filesize}" File_Type="%{GREEDYDATA:filetype}" MalScore="%{NUMBER:malscore}" MalFamily="%{GREEDYDATA:malfamily}" Related_IPs="%{GREEDYDATA:relatedips}" Related_Domains="%{GREEDYDATA:relateddomains}" Total_TCP="%{NUMBER:totaltcp}" Total_UDP="%{NUMBER:totaludp}"Virustotal="%{GREEDYDATA:virustotal}" Cuckoo_Sigs="%{GREEDYDATA:cuckoosigs}" Yara="%{GREEDYDATA:yara}" ' ]
                                add_tag => [ "malfamily" ]
                                remove_tag => [ "_grokparsefailure" ]
                                named_captures_only => true
                        }
                }
        }
}

I added on the 2nd filter option to include 'Malfamily.' Need to condense it to the main block to clean up the code.

This was a quick hack, but try this and/or something like this (if it doesn't work - no promises!).

filter {
        if "cuckoo" in [tags] {
                grok {
                        patterns_dir => "/opt/logstash/patterns"
                        match => [ 'message', 'Timestamp="%{PF_DATE_TIME:starttime}" id="%{NUMBER:id}" Submission="%{WORD:submission}" MD5="%{WORD:md5}" SHA1="%{WORD:sha1}" File_Name="%{WORD:filename}" File_Size="%{NUMBER:filesize}" File_Type="%{GREEDYDATA:filetype}" MalScore="%{NUMBER:malscore}" Related_IPs="%{GREEDYDATA:relatedips}" Related_Domains="%{GREEDYDATA:relateddomains}" Total_TCP="%{NUMBER:totaltcp}" Total_UDP="%{NUMBER:totaludp}"Virustotal="%{GREEDYDATA:virustotal}" Cuckoo_Sigs="%{GREEDYDATA:cuckoosigs}" Yara="%{GREEDYDATA:yara}" ' ]
                        add_tag => ["nomalfamily"]
                        remove_tag => ["_grokparsefailure_sysloginput"]
                        named_captures_only => true
                }
                kv {
                    field_split => ";"
                    source => "relateddomain"
                }
        }
}
1 Like

An alternative to doing this in Logstash would be to have a custom tokenizer in Elasticsearch which splits on those characters explicitly.

Using an index template you could do something like this:

{
  "template": "cuckoo",
  "settings": {
        "index.analysis.analyzer.comma.type": "custom",
        "index.analysis.analyzer.comma.filter": [ "trim" ],
        "index.analysis.analyzer.comma.tokenizer": "commatokenizer",
        "index.analysis.tokenizer.commatokenizer.type": "pattern",
        "index.analysis.tokenizer.commatokenizer.pattern": ",",
        "index.analysis.analyzer.semicolon.type": "custom",
        "index.analysis.analyzer.semicolon.filter": [ "trim" ],
        "index.analysis.analyzer.semicolon.tokenizer": "semicolontokenizer",
        "index.analysis.tokenizer.semicolontokenizer.type": "pattern",
        "index.analysis.tokenizer.semicolontokenizer.pattern": ";"
    },
    "mappings": {
        "_default_": {
            "properties": {
                "relateddomains": {
                  "type": "string",
                  "analyzer": "semicolon"
                },
                "filetype": {
                  "type": "string",
                  "analyzer": "comma"
                }
            }
        }
    }
}

Edit: I also don't see how the KV filter will help you, as the fields look to be multiple values under a single key, as opposed to individual key values pairs.

1 Like

I'd suggest that it's better to do in Logstash so it's explicit in what is happening during processing pipeline.
Putting in ES is a good idea, but someone can easily miss that if they aren't aware it's even possible :slight_smile:

I don't know if a general lack of understanding is a good reason to avoid the better solution, although I base that on the assumption it's being loaded into ES. If that isn't true, it's probably more appropriate (with a very similar outcome to the ES mapping) to use the split function of the mutate filter to convert the field to an array of values.

The following code will accomplish this, though you may have some whitespace to trim with the filetype field.

filter {
    mutate {
        split => { "relatedips" => ";" }
        split => { "relateddomains" => ";" }
        split => { "filetype" => "," }
    }
}
1 Like

Better solution is relative.
Having the entire config for processing in LS is a lot saner, you aren't checking in multiple places for example.

1 Like

Cheers. Not sure what changed between today and a few days ago... I ended up using the mutate > splits to load them as an array. Data is coming across as intended now.

I appreciate the quick and detailed responses.