CEF plugin is not working as expected

Hi,

I'm trying to configure Logstash to work with SentinelOne logs, but it is not working as expected, it looks like it is not separating the fields correctly.

I configured the input to be CEF and the output to be json_lines:

input {
    tcp {
      port => "15518"
      ssl_enable => true
      ssl_verify => false
      ssl_cert => ""***""
      ssl_key => ""***""
      type => "sentinelone"
      codec => cef
    }
}

output {
  if [type] == "sentinelone" {
     file {
      path => "/var/log/test_logstash_s1.log"
      codec => json_lines
    }
   }
}

This is an example of the output I receive:

{"syslog":"<14>2021-10-06 11:38:36,895 sentinel - ","deviceVendor":"SentinelOne","port":17233,"deviceEventClassId":"fileName=XXX","deviceReceiptTime":"2021-10-06 11:38:28.029191|deviceAddress=18.195.251.162|deviceHostFqdn=XXX|deviceHostName=XXX|notificationScope=SITE|siteId=XXX|siteName=XXX|accountId=XXX|accountName=XXX|vendor=SentinelOne|eventID=2030|eventDesc=Analyst verdict changed|eventSeverity=1|originatorName=XXX|originatorVersion=21.6.3.7|sourceAgentLastActivityTimestamp=2021-10-06 11:38:16.822000|sourceAgentRegisterTimestamp=2021-10-05 10:38:53.014026|sourceNetworkState=connected|sourceOsRevision=Ubuntu 18.04.5 LTS 4.15.0-136-generic|sourceOsType=linux|sourceAgentUuid=XXX|sourceFqdn=XXX|sourceThreatCount=0|sourceMgmtPrecievedAddress=XXX|sourceDnsDomain=XXX|sourceHostName=XXX|sourceUserName=|sourceUserId=|sourceAgentId=XXX|sourceGroupId=472773934044847262|sourceGroupName=Default Group|sourceIpAddresses=['XXX', 'XXX', 'XXX', 'XXX']|sourceMacAddresses=['XXX', 'XXX', 'XXX', 'XXX']|threatClassification=Malware|threatClassificationSource=Engine|threatDetectingEngine=linux.static|threatClassifier=None|threatMitigationStatus=marked_as_benign|threatConfidenceLevel=suspicious|threatMitigatedPreemptively=False|threatMitigationStatusLabel=suspicious_resolved|threatMitigationStatusID=5|threatCommandLineArguments=|threatID=1260285295206610793|threatStoryline=01d69a9b-4244-89fb-de08-b619c9256293|threatDetectionTime=2021-10-05 20:41:54.674742|threatIndicatorsList=|threatProcessUser=|fileHashSha256=None|fileHashMd5=None|cat=THREATMANAGEMENT|activityID=1260736546593181249|activityType=2030","host":"XXX","cefVersion":"2","@timestamp":"2021-10-06T11:38:37.481Z","deviceProduct":"Mgmt","type":"sentinelone","@version":"1","deviceVersion":"suser=XXX","name":"oldValue=Undefined","severity":"newValue=False positive"}

What am I missing here? why does it take multiple fields as one?

Thank you :slight_smile:

Could you remove the codec => cef and paste what the output is? Curious on the structure of the messages coming in without any processing.

Sure,

{"port":22972,"@version":"1","type":"sentinelone","@timestamp":"2021-10-06T13:23:42.646Z","host":"XXX","message":"<14>2021-10-06 13:23:42,535 sentinel - CEF:2|SentinelOne|Mgmt|Linux|fileHash= XXX |filePath= XXX |fileName= XXX |deviceAddress= XXX |deviceHostFqdn= XXX |deviceHostName= XXX |notificationScope=SITE|siteId= XXX |siteName= XXX |accountId=472773930102201398|accountName= XXX |vendor=SentinelOne|eventID=4008|eventDesc=Threat status changed|eventSeverity=1|originatorName=node2|originatorVersion=21.6.3.7|sourceAgentLastActivityTimestamp=2021-10-06 13:22:52.566550|sourceAgentRegisterTimestamp=2021-10-05 12:47:56.818160|sourceNetworkState=connected|sourceOsRevision=Ubuntu 18.04.5 LTS 4.15.0-136-generic|sourceOsType=linux|sourceAgentUuid=90d77435-6aad-b2e2-61c3-5097e37d1cd0|sourceFqdn= XXX |sourceThreatCount=0|sourceMgmtPrecievedAddress= XXX |sourceDnsDomain= XXX |sourceHostName= XXX |sourceUserName=|sourceUserId=|sourceAgentId= XXX |sourceGroupId= XXX |sourceGroupName=Default Group|sourceIpAddresses=['XXX', 'XXX', 'XXX', 'XXX']|sourceMacAddresses=['XXX', 'XXX', ,XXX', 'XXX']|threatClassification=Malware|threatClassificationSource=Engine|threatDetectingEngine=linux.static|threatClassifier=None|threatMitigationStatus=marked_as_benign|threatConfidenceLevel=suspicious|threatMitigatedPreemptively=False|threatMitigationStatusLabel=suspicious|threatMitigationStatusID=3|threatCommandLineArguments=|threatID=1260136251988460626|threatStoryline=f43477e8-319b-a5b4-184d-78b1a002bf95|threatDetectionTime=2021-10-05 15:45:47.333056|threatIndicatorsList=|threatProcessUser=|fileHashSha256=None|fileHashMd5=None|cat=THREATMANAGEMENT|rt=2021-10-05 15:45:47.333056|activityID=1260789455462837914|activityType=4008"}

What I was looking for is it actually CEF that is coming across which would be a single line with what's in your message field. Looks correct.

But it looks like the format is incorrect for what it is expecting and that's why it's breaking.

Base Format
CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension

What it should look like
Mar 19 15:19:15 root CEF:0|Trend Micro|Deep Security Agent|<DSA version>|123|Out Of Allowed Policy|5|

What yours looks like
<14>2021-10-06 13:23:42,535 sentinel - CEF:2|SentinelOne|Mgmt|Linux|

It's missing some base fields and with using pipes later in your message it's most likely thinking those are some base fields.

What it should look like
src=10.52.116.160 suser=admin target=admin

What yours looks like
fileHash= XXX |filePath= XXX |fileName= XXX

The delimiters are not standard.

If you are unable to change the message source to the format that Logstash Codec requires then using Dissect or Grok to parse the message might be the better route.

Thank you!
I probably won't be able to change the log, but I thought of trying to break it into 2 using grock and then use KV for everything after:

"<14>2021-10-06 13:23:42,535 sentinel - CEF:2|SentinelOne|Mgmt|Linux

Do you think it will work?

Think Grok is the way to go still but I tried KV.

First had to KV between the pipes. Then KV for the equal signs. Then had to remove all fields that contained a pipe.

filter {
 kv {
  field_split => "|"
 }
 kv {
  field_split => "="
 }      
 ruby {
  code => '
   event.to_hash.each { |k, v|
   if k.include? "|"
    event.remove(k)
   end
   }
  '
}

This still left with 1 warning.

Exception while parsing KV {:exception=>"Invalid FieldReference: `['XXX', 'XXX', 'XXX', 'XXX']|sourceMacAddresses`"}

Thanks again for your help, I thought about the following:

filter {
kv { field_split => "|"
value_split => "="
}
}

But I'm not sure how it will work with fields such as:

sourceIpAddresses=['XXX', 'XXX', 'XXX', 'XXX']

If it would be able to handle them, I'd like to convert them to JSON as well, but I'm not sure if there is an option for that :thinking:

Started the Grok pattern and you can continue on with it if you want to go this route. I used WORD because the values are hidden but most likely you can use a different pattern if it matches. (UNIXPATH or WINPATH for example)

grok {
 match => { "message" => "fileHash=%{WORD:fileHash}\|filePath=%{WORD:filePath}\|fileName=%{WORD:fileName}\|deviceAddress=%{WORD:deviceAddress}\|deviceHostFqdn=%{WORD:deviceHostFqdn}" }
}

The issue is that the fields are not persistent and it depends on the log type, so I'm not sure if grock is good enough

You could try

    dissect {
        mapping => { "message" => "<%{syslog_pri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{} - %{}|%{}|%{}|%{}|%{[@metadata][restOfLine]}" }
        remove_field => [ "message" ]
    }
    date { match => [ "[@metadata][ts]", "YYYY-MM-dd HH:mm:ss,SSS" ] }
    kv { source => "[@metadata][restOfLine]" field_split => "|" trim_value => " " }
    mutate { gsub => [ "sourceMacAddresses", "[' ]", "", "sourceIpAddresses", "[' ]", "" ] }
    mutate { split => { "sourceMacAddresses" => "," "sourceIpAddresses" => "," } }

which will get you

               "sourceIpAddresses" => [
    [0] "XXX",
    [1] "XXX",
    [2] "XXX",
    [3] "XXX"
],
                       "accountId" => "472773930102201398",
                      "syslog_pri" => "14",
                 "sourceAgentUuid" => "90d77435-6aad-b2e2-61c3-5097e37d1cd0",
                        "fileName" => "XXX",
                      "@timestamp" => 2021-10-06T17:23:42.535Z,
                   "deviceAddress" => "XXX",
     "threatMitigatedPreemptively" => "False",
                  "deviceHostFqdn" => "XXX",

etc.

Thank you for your suggestions!
I'll keep dig into that as every log seem to have different fields

I'm also trying to work with the second format they provided... seems to be challenging as well:

Another option they can't provide a format that Logstash will process could be pipeline to pipeline.

The CEF Codec works in the input so you can't transform it to be compliant in Logstash and then process it. The idea is to have 1 pipeline that will read the input and filter to transform it to be CEF compliant. Then you output to another pipeline where you can apply the CEF Codec and it will process it properly.