Security Logs from S3 Bucket

Good Morning,

We currently have a very strange situation in which we get a series of security logs placed into a single file in an S3 bucket every 8 hours (windows event logs, syslog, and a few others) which we need to ingest into the elastic SIEM and run detections on. An example of one of the logs is below, also just coincidence that it's a Logstash error (different environment):

   ` {"MsgClassTypeId":"3000","Direction":"0","ImpactedZoneEnum":"0","message":"<30>Feb 13 23:45:24 xx.xx.xx.xx Account=\"\" Action=\"\" Aggregate=\"False\" Amount=\"\" Archive=\"True\" BytesIn=\"\" BytesOut=\"\" CollectionSequence=\"825328\" Command=\"\" CommonEventId=\"3\" CommonEventName=\"General Operations\" CVE=\"\" DateInserted=\"2/13/2021 11:45:24 PM\" DInterface=\"\" DIP=\"\" Direction=\"0\" DirectionName=\"Unknown\" DMAC=\"\" DName=\"\" DNameParsed=\"\" DNameResolved=\"\" DNATIP=\"\" DNATPort=\"-1\" Domain=\"\" DomainOrigin=\"\" DPort=\"-1\" DropLog=\"False\" DropRaw=\"False\" Duration=\"\" EntityId=\"" EventClassification=\"-1\" EventCommonEventID=\"-1\" FalseAlarmRating=\"0\" Forward=\"False\" ForwardToLogMart=\"False\" GLPRAssignedRBP=\"-1\" Group=\"\" HasBeenInserted_EMDB=\"False\" HasBeenQueued_Archiving=\"True\" HasBeenQueued_EventProcessor=\"False\" HasBeenQueued_LogProcessor=\"True\" Hash=\"\" HostID=\"44\" IgnoreGlobalRBPCriteria=\"False\" ImpactedEntityId=\"0\" ImpactedEntityName=\"\" ImpactedHostId=\"-1\" ImpactedHostName=\"\" ImpactedLocationKey=\"\" ImpactedLocationName=\"\" ImpactedNetworkId=\"-1\" ImpactedNetworkName=\"\" ImpactedZoneEnum=\"0\" ImpactedZoneName=\"\" IsDNameParsedValue=\"True\" IsRemote=\"True\" IsSNameParsedValue=\"True\" ItemsIn=\"\" ItemsOut=\"\" LDSVERSION=\"1.1\" Login=\"\" LogMartMode=\"13627389\" LogSourceId=\"158\" LogSourceName=\"ip-xx-xx-xx-xx.eu-west-2.computer.internal Linux Syslog\" MediatorMsgID=\"0\" MediatorSessionID=\"1640\" MsgClassId=\"3999\" MsgClassName=\"Other Operations\" MsgClassTypeId=\"3000\" MsgClassTypeName=\"Operations\" MsgCount=\"1\" MsgDate=\"2021-02-13T23:45:24.0000000+00:00\" MsgDateOrigin=\"0\" MsgSourceHostID=\"44\" MsgSourceTypeId=\"88\" MsgSourceTypeName=\"Syslog - Linux Host\" NormalMsgDate=\"2021-02-13T23:45:24.0540000Z\" Object=\"\" ObjectName=\"\" ObjectType=\"\" OriginEntityId=\"0\" OriginEntityName=\"\" OriginHostId=\"-1\" OriginHostName=\"\" OriginLocationKey=\"\" OriginLocationName=\"\" OriginNetworkId=\"-1\" OriginNetworkName=\"\" OriginZoneEnum=\"0\" OriginZoneName=\"\" ParentProcessId=\"\" ParentProcessName=\"\" ParentProcessPath=\"\" PID=\"-1\" Policy=\"\" Priority=\"4\" Process=\"\" ProtocolId=\"-1\" ProtocolName=\"\" Quantity=\"\" Rate=\"\" Reason=\"\" Recipient=\"\" RecipientIdentity=\"\" RecipientIdentityCompany=\"\" RecipientIdentityDepartment=\"\" RecipientIdentityDomain=\"\" RecipientIdentityID=\"-1\" RecipientIdentityTitle=\"\" ResolvedImpactedName=\"\" ResolvedOriginName=\"\" ResponseCode=\"\" Result=\"\" RiskRating=\"0\" RootEntityId=\"9\" Sender=\"\" SenderIdentity=\"\" SenderIdentityCompany=\"\" SenderIdentityDepartment=\"\" SenderIdentityDomain=\"\" SenderIdentityID=\"-1\" SenderIdentityTitle=\"\" SerialNumber=\"\" ServiceId=\"-1\" ServiceName=\"\" Session=\"\" SessionType=\"\" Severity=\"\" SInterface=\"\" SIP=\"\" Size=\"\" SMAC=\"\" SName=\"\" SNameParsed=\"\" SNameResolved=\"\" SNATIP=\"\" SNATPort=\"-1\" SPort=\"-1\" Status=\"\" Subject=\"\" SystemMonitorID=\"9\" ThreatId=\"\" ThreatName=\"\" UniqueID=\"7d4c4ed3-a2fc-44bc-a7ec-0b8b68e7f456\" URL=\"\" UserAgent=\"\" UserImpactedIdentity=\"\" UserImpactedIdentityCompany=\"\" UserImpactedIdentityDomain=\"\" UserImpactedIdentityID=\"-1\" UserImpactedIdentityTitle=\"\" UserOriginIdentity=\"\" UserOriginIdentityCompany=\"\" UserOriginIdentityDepartment=\"\" UserOriginIdentityDomain=\"\" UserOriginIdentityID=\"-1\" UserOriginIdentityTitle=\"\" VendorInfo=\"\" VendorMsgID=\"\" Version=\"\" RawLog=\"02 13 2021 23:45:24 xx.xx.xx.xx <SYSD:INFO> Feb 13 23:45:24 euw2-ec2--001 metricbeat[3031]: 2021-02-13T23:45:24.264Z#011ERROR#011[logstash.node_stats]#011node_stats/node_stats.go:73#011error making http request: Get \\\"https://xx.xx.xx.xx:9600/\\\": dial tcp xx.xx.xx.xx:9600: connect: connection refused\"","CollectionSequence":"825328","NormalMsgDate":"2021-02-13T23:45:24.0540000Z"}`

Each log is on it's own line so them all being in the same file is not too much of an issue. I currently have my Logstash pipeline setup as shown below (slightly condensed as we have more than 2 log types) so that each type of log goes into a separate index based on a field called MsgSourceTypeName. Where i am confused is what to do next so that i can use the logs in Elastic Security and run the inbuilt detections against them. Within the message there is a field called RawLog so i assume i'll need to extract that and parse the original log into ECS (logs in file has loads of other metadata i'm not concerned with) however i wondered if there is an easier way than using custom parsing seeing as all the logs i am pulling from the file are standard security logs. Also if custom parsing is required how would i go about that so it still allows the default detections to be run against the indexes.

filter {
   #Extract the message from the logs and decode it as JSON
   mutate {
        replace => [ "message", "%{message}" ]
   }
   json {
        source => "message"
        remove_field => "message"
   }
   
   #Add a tag depending on the Msg Source
   if [MsgSourceTypeName] == "Syslog - Linux Host" {
    mutate {
      add_field => {
        "[@metadata][target_index]" => "logs-syslog"
      }
    }
   }
   else if [MsgSourceTypeName] == "MS Windows Event Logging XML - Application" {
    mutate {
      add_field => {
        "[@metadata][target_index]" => "logs-ms_event_log_application"
      }
    }
   }
}
output {
   elasticsearch {
     hosts => ["https://xx.xx.xx.xx:9200","https://xx.xx.xx.xx:9200"]
     index => "%{[@metadata][target_index]}"
 }
}

Any help you can provide would be greatly appreciated as I'm way out of my depth and am the only person in my company with any knowledge on logstash/elastic.

Just been doing a little more research into this and had a thought with regards to this to make the parsing of the logs easier. If i was to set target_index to the name that matches default elastic index template or the log type, so for example for syslog put target_index as "logs-system.syslog-S3" would that then parse them as Syslog for use within the Elastic SIEM, or would that possibly break everything?