Filtering incoming syslog (CSV) with columns containing JSON

I'm importing syslog messages from Palo Alto's Cortex Hub (specifically Traps logs) using logstash's syslog importer. That works great.

The issue I'm running into is that these logs (in syslog format) contain json in 5 different columns, one of which has nested lists. The same ',' delimiter is used through the whole message so when the csv filter runs through it it breaks the json into many different columns.

I'm trying to avoid hard-coding these columns and merging them back together after the csv filter runs in case the number of items in some of the json lists change. It seems messy to do it that way.

I've run through attempting to pre-filter the incoming message with gsub, but ruby doesn't support the regex syntax I need to be able to handle nested lists.


ruby regex doesn't support (*SKIP)

Since pre-filtering the incoming message still seemed like a good idea I looked into whether I could use python to run this regex on the message and pass it on. I ran through the example provided by vitaly in this thread, but I'm unable to get the example working, it seems the 2 year old code no longer works with logstash 6.4. Supposedly, the shell that gets created by open3 can't find my script.

I may be over-complicating this issue but the only references I've found to multiple json fields in an incoming syslog message recommended to change the delimiter in the message.

I'll include a copy of one of the logs I'm attempting to filter.

"1362 <134>1 2019-04-09T23:52:41.143Z sc-1551361917-logforwardercomp-5ca269c32fc4dd0040f17ae4-852fnzc logforwarder 24 panwlogs - threat,threat,,AgentSecurityEvent,2019-04-09T23:52:24.279Z,2019-04-09T23:52:26.683Z,2019-04-09T23:52:24.279Z,-420,,TrapsAgent,1551361817,2338173641220014550,prodngb-core-op-reports-i-05e5ad83ad86bbb9b,2.1.1+11483,10,1,856e25601445f4ddf04e7db5dd8d29e6,1,0,10.0.14393,1,,IP-AC1FFA22,domain.local,0,2,,52-8145,0,e35da457d77a49b5aa187410b57268f4,COMPONENT_WILDFIRE,Malware,CYSTATUS_MALICIOUS_EXE,1,reported,0,,,0,0,['D:\\\\Users\\\\user\\\\Downloads\\\\wildfire-test-pe-file.exe','B8A8BAFBC37C49BF76CA27180E81254663649F9C19FB1F510816A9F3441A5647','B8A8BAFBC37C49BF76CA27180E81254663649F9C19FB1F510816A9F3441A5647','1'],0,-1,0,[{'pid':3748,'parentId':8332,'exeFileIdx':0,'userIdx':0,'commandLine':'\\'D:\\\\Users\\\\user\\\\Downloads\\\\wildfire-test-pe-file.exe\\' ','instanceId':'AdTvL0dQ5xkAAA6kAAAAAA==','terminated':0,'reportIds':[],'threads':[]}],[{'rawFullPath':'D:\\\\Users\\\\user\\\\Downloads\\\\wildfire-test-pe-file.exe','fileName':'wildfire-test-pe-file.exe','sha256':'B8A8BAFBC37C49BF76CA27180E81254663649F9C19FB1F510816A9F3441A5647','fileSize':55296,'innerObjectSha256':'B8A8BAFBC37C49BF76CA27180E81254663649F9C19FB1F510816A9F3441A5647'}],[{'userName':'user','userDomain':'domain.local'}],[],WildFire Malware\n"

I'm looking for better approaches to this problem, or perhaps solutions to my attempts that have failed. I should point out that I want to define the columns for the syslog message, rather than leaving it all in the message field as I'm outputting this as JSON to kinesis and streamalert.

Turns out I was overcomplicating the issue.

I ended up taking a more simple approach, wrapping the embedded json in single quotes using

mutate {
  gsub => ["message", "(\[(?:[^\]\[]|\g<1>)*\])", "'\1'"]

Setting the csv filter to quote_char => "'" allowed it to consume the entire json list as a single column.

I was then able to target the fields with the json importer.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.