Security Logs from S3 Bucket

Good Morning,

We currently have a very strange situation in which we get a series of security logs placed into a single file in an S3 bucket every 8 hours (windows event logs, syslog, and a few others) which we need to ingest into the elastic SIEM and run detections on. An example of one of the logs is below, also just coincidence that it's a Logstash error (different environment):

   ` {"MsgClassTypeId":"3000","Direction":"0","ImpactedZoneEnum":"0","message":"<30>Feb 13 23:45:24 xx.xx.xx.xx Account=\"\" Action=\"\" Aggregate=\"False\" Amount=\"\" Archive=\"True\" BytesIn=\"\" BytesOut=\"\" CollectionSequence=\"825328\" Command=\"\" CommonEventId=\"3\" CommonEventName=\"General Operations\" CVE=\"\" DateInserted=\"2/13/2021 11:45:24 PM\" DInterface=\"\" DIP=\"\" Direction=\"0\" DirectionName=\"Unknown\" DMAC=\"\" DName=\"\" DNameParsed=\"\" DNameResolved=\"\" DNATIP=\"\" DNATPort=\"-1\" Domain=\"\" DomainOrigin=\"\" DPort=\"-1\" DropLog=\"False\" DropRaw=\"False\" Duration=\"\" EntityId=\"" EventClassification=\"-1\" EventCommonEventID=\"-1\" FalseAlarmRating=\"0\" Forward=\"False\" ForwardToLogMart=\"False\" GLPRAssignedRBP=\"-1\" Group=\"\" HasBeenInserted_EMDB=\"False\" HasBeenQueued_Archiving=\"True\" HasBeenQueued_EventProcessor=\"False\" HasBeenQueued_LogProcessor=\"True\" Hash=\"\" HostID=\"44\" IgnoreGlobalRBPCriteria=\"False\" ImpactedEntityId=\"0\" ImpactedEntityName=\"\" ImpactedHostId=\"-1\" ImpactedHostName=\"\" ImpactedLocationKey=\"\" ImpactedLocationName=\"\" ImpactedNetworkId=\"-1\" ImpactedNetworkName=\"\" ImpactedZoneEnum=\"0\" ImpactedZoneName=\"\" IsDNameParsedValue=\"True\" IsRemote=\"True\" IsSNameParsedValue=\"True\" ItemsIn=\"\" ItemsOut=\"\" LDSVERSION=\"1.1\" Login=\"\" LogMartMode=\"13627389\" LogSourceId=\"158\" LogSourceName=\"ip-xx-xx-xx-xx.eu-west-2.computer.internal Linux Syslog\" MediatorMsgID=\"0\" MediatorSessionID=\"1640\" MsgClassId=\"3999\" MsgClassName=\"Other Operations\" MsgClassTypeId=\"3000\" MsgClassTypeName=\"Operations\" MsgCount=\"1\" MsgDate=\"2021-02-13T23:45:24.0000000+00:00\" MsgDateOrigin=\"0\" MsgSourceHostID=\"44\" MsgSourceTypeId=\"88\" MsgSourceTypeName=\"Syslog - Linux Host\" NormalMsgDate=\"2021-02-13T23:45:24.0540000Z\" Object=\"\" ObjectName=\"\" ObjectType=\"\" OriginEntityId=\"0\" OriginEntityName=\"\" OriginHostId=\"-1\" OriginHostName=\"\" OriginLocationKey=\"\" OriginLocationName=\"\" OriginNetworkId=\"-1\" OriginNetworkName=\"\" OriginZoneEnum=\"0\" OriginZoneName=\"\" ParentProcessId=\"\" ParentProcessName=\"\" ParentProcessPath=\"\" PID=\"-1\" Policy=\"\" Priority=\"4\" Process=\"\" ProtocolId=\"-1\" ProtocolName=\"\" Quantity=\"\" Rate=\"\" Reason=\"\" Recipient=\"\" RecipientIdentity=\"\" RecipientIdentityCompany=\"\" RecipientIdentityDepartment=\"\" RecipientIdentityDomain=\"\" RecipientIdentityID=\"-1\" RecipientIdentityTitle=\"\" ResolvedImpactedName=\"\" ResolvedOriginName=\"\" ResponseCode=\"\" Result=\"\" RiskRating=\"0\" RootEntityId=\"9\" Sender=\"\" SenderIdentity=\"\" SenderIdentityCompany=\"\" SenderIdentityDepartment=\"\" SenderIdentityDomain=\"\" SenderIdentityID=\"-1\" SenderIdentityTitle=\"\" SerialNumber=\"\" ServiceId=\"-1\" ServiceName=\"\" Session=\"\" SessionType=\"\" Severity=\"\" SInterface=\"\" SIP=\"\" Size=\"\" SMAC=\"\" SName=\"\" SNameParsed=\"\" SNameResolved=\"\" SNATIP=\"\" SNATPort=\"-1\" SPort=\"-1\" Status=\"\" Subject=\"\" SystemMonitorID=\"9\" ThreatId=\"\" ThreatName=\"\" UniqueID=\"7d4c4ed3-a2fc-44bc-a7ec-0b8b68e7f456\" URL=\"\" UserAgent=\"\" UserImpactedIdentity=\"\" UserImpactedIdentityCompany=\"\" UserImpactedIdentityDomain=\"\" UserImpactedIdentityID=\"-1\" UserImpactedIdentityTitle=\"\" UserOriginIdentity=\"\" UserOriginIdentityCompany=\"\" UserOriginIdentityDepartment=\"\" UserOriginIdentityDomain=\"\" UserOriginIdentityID=\"-1\" UserOriginIdentityTitle=\"\" VendorInfo=\"\" VendorMsgID=\"\" Version=\"\" RawLog=\"02 13 2021 23:45:24 xx.xx.xx.xx <SYSD:INFO> Feb 13 23:45:24 euw2-ec2--001 metricbeat[3031]: 2021-02-13T23:45:24.264Z#011ERROR#011[logstash.node_stats]#011node_stats/node_stats.go:73#011error making http request: Get \\\"https://xx.xx.xx.xx:9600/\\\": dial tcp xx.xx.xx.xx:9600: connect: connection refused\"","CollectionSequence":"825328","NormalMsgDate":"2021-02-13T23:45:24.0540000Z"}`

Each log is on it's own line so them all being in the same file is not too much of an issue. I currently have my Logstash pipeline setup as shown below (slightly condensed as we have more than 2 log types) so that each type of log goes into a separate index based on a field called MsgSourceTypeName. Where i am confused is what to do next so that i can use the logs in Elastic Security and run the inbuilt detections against them. Within the message there is a field called RawLog so i assume i'll need to extract that and parse the original log into ECS (logs in file has loads of other metadata i'm not concerned with) however i wondered if there is an easier way than using custom parsing seeing as all the logs i am pulling from the file are standard security logs. Also if custom parsing is required how would i go about that so it still allows the default detections to be run against the indexes.

filter {
   #Extract the message from the logs and decode it as JSON
   mutate {
        replace => [ "message", "%{message}" ]
   }
   json {
        source => "message"
        remove_field => "message"
   }
   
   #Add a tag depending on the Msg Source
   if [MsgSourceTypeName] == "Syslog - Linux Host" {
    mutate {
      add_field => {
        "[@metadata][target_index]" => "logs-syslog"
      }
    }
   }
   else if [MsgSourceTypeName] == "MS Windows Event Logging XML - Application" {
    mutate {
      add_field => {
        "[@metadata][target_index]" => "logs-ms_event_log_application"
      }
    }
   }
}
output {
   elasticsearch {
     hosts => ["https://xx.xx.xx.xx:9200","https://xx.xx.xx.xx:9200"]
     index => "%{[@metadata][target_index]}"
 }
}

Any help you can provide would be greatly appreciated as I'm way out of my depth and am the only person in my company with any knowledge on logstash/elastic.

Just been doing a little more research into this and had a thought with regards to this to make the parsing of the logs easier. If i was to set target_index to the name that matches default elastic index template or the log type, so for example for syslog put target_index as "logs-system.syslog-S3" would that then parse them as Syslog for use within the Elastic SIEM, or would that possibly break everything?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.