Logstash to parse nested json arrays that contain different log formats

I am seeking assistance with developing/determining the best course of action for ingesting json data from an AWS environment. Ordinarily this would not be difficult for me via Logstash, however the logs contain a nested json array that may have many different log formats all together within this array (windows event logs, apache logs, etc..). I am able to split these entries on the json array field, however I cannot then figure out how to parse these events so that they are not raw text.

I have some experience utilizing the dissect filter to custom assign fields based on the format seen, however this would required if statements encompassing up to 14 different log formats to parse all of them properly. With a large chunk of these logs being windows event logs, I would like to leverage winlogbeat to parse these, but it will not work while they are within the nested json arrays. All of these are cold logs that I have to ingest. Total number of log files to parse is well over 500,000.

I am looking for any advice in regards to a best approach to tackle this issue, or if I am going down the right path, any assitance. Log format example below:

{"messageType": "DATA_MESSAGE", "owner": "1111111111", "logGroup": "vpc-1111111_11111111111_10.10.10.10_namehere.co", "logStream":  "111111111_name_logs", "subscriptionFilters": ["myLambdaTrigger"], "logEvents": [{"id": "1234567890", "timestamp": "1609459242232", "message": "warning, mail, Jan 1 00:00:41, servername, postfix/smtpd[1111]:, warning: log message is here - more info", "extractedFields": {bunch of data that is broken out horrible and i do not need}}, {"id": "1234567890", "timestamp": "1609459242232", "message": "Massive nested Windows event log in .xml format here", "extractedFields": {bunch of data that is broken out horrible and i do not need}}, {"id": "1234567890", "timestamp": "1609459242232", "message": "warning, mail, Jan 1 00:00:41, servername, postfix/smtpd[1111]:, warning: log message is here - more info", "extractedFields": {bunch of data that is broken out horrible and i do not need}}, {"id": "1234567890", "timestamp": "1609459242232", "message": "another windows event log in xml format here", "extractedFields": {bunch of data that is broken out horrible and i do not need}}]}

Currently I am able to parse all of the nested log events out, and match/parse the first log format from above, but any type of log that does not match comes in via raw text. Is the best way ahead to create a custom dissect filter for every possible log format using regex? If so, I am unsure how to manually parse the windows event logs using logstash as opposed to winlogbeat, etc.. Is there a way to forward the matching windows logs to winlogbeat? Looking for any assistance here..

My logstash configuration is currently like this:

input {
	file {
		path => "/path/to/files"
		codec => "json"
		start_position => "beginning"
		sincedb_path => "/dev/null"
		max_open_files => 50000
		mode => "read"
		exit_after_read => true
		file_completed_action => log
		file_completed_log_path => "/dev/null"
	}
}
filter {
	json {
		source => "message"
	}
	split {
		field => "logEvents"
		add_tag => ["split_logs"]
	}
	dissect {
		mapping {
			"[logEvents][message]" => "%{log_level},%{log_type},%{log_date},%{log_host},%{log_context},%{log_content}"
		}
	}
	mutate {
		remove_field => "[logEvents][extractedFields]"
	}
	date {
		match => ["[logEvents][timestamp]","UNIX_MS"]
		timezone => "UTC"
		locale => "en"
		target => "@timestamp"
	}
}
output {
	elasticsearch {
		hosts => ["elasticip:port"]
		index => "index-name"
	}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.