RFC-5424 log parsing

Hi,

I have logs that I'm sending to Logstash from SentinelOne in an RFC-5424 format(this is the way they called it) that I wasn't sure how to handle.

I began by dividing the fields using Grok and now my issue is how to handle the following part. The goal is to use the name after the space as the key and the data after the equal sign as a value (some logs may have more/fewer fields in the data). The output eventually is JSON.
Do you have any suggestions?

[activityType@53163 activityType="5125"][activityId@53163 activityId="623003754907567206"][siteId@53163 siteId="623002780243591455"][siteName@53163 siteName="mlbcjy"][accountId@53163 accountId="622999338330619906"][accountName@53163 accountName="SentinelOne"][notificationScope@53163 notificationScope="SITE"][agentId@53163 agentId="623002794101572118"][threatId@53163 threatId="-"][comments@53163 comments="-"][userId@53163 userId="-"][data.uid@53163 data.uid="Serial"][data.creator@53163 data.creator="wsnbiw"][data.osType@53163 data.osType="windows"][data.ruleId@53163 data.ruleId="623003742878303298"][data.version@53163 data.version="N/A"][data.eventId@53163 data.eventId="492c9d6a-116d-4f0f-a04c-2c609e55f62c"][data.groupId@53163 data.groupId="623002780268757280"][data.interface@53163 data.interface="USB"][data.ruleName@53163 data.ruleName="avywou"][data.ruleType@53163 data.ruleType="productId"][data.vendorId@53163 data.vendorId="3"][data.eventTime@53163 data.eventTime="2014-11-09T11:14:33.900+03:00"][data.eventType@53163 data.eventType="blocked"][data.productId@53163 data.productId="38B83"][data.scopeName@53163 data.scopeName="mlbcjy (SentinelOne) Default Group"][data.deviceName@53163 data.deviceName="My device"][data.lmpVersion@53163 data.lmpVersion="N/A"][data.minorClass@53163 data.minorClass="N/A"][data.deviceClass@53163 data.deviceClass="01h"][data.computerName@53163 data.computerName="TEST-AGENT-WIN"][data.profileUuids@53163 data.profileUuids="N/A"][data.ruleScopeName@53163 data.ruleScopeName="site mlbcjy"][data.lastLoggedInUserName@53163 data.lastLoggedInUserName="testuser"][secondaryDescription@53163 secondaryDescription="-"][description@53163 description="-"][createdAt@53163 createdAt="2019-05-10T13:57:15.009733Z"][groupId@53163 groupId="623002780268757280"][agentUpdatedVersion@53163 agentUpdatedVersion="-"][hash@53163 hash="-"][osFamily@53163 osFamily="-"][siteId@53163 siteId="623002780243591455"][updatedAt@53163 updatedAt="2019-05-10T13:57:15.009738Z"]

Here is an the full log example from their documentation:

<36>1 2019-05-10T13:57:15.009733Z 192.0.2.4 SentinelOne 91ce403e-66c2-488e-a3ee-351615cf7512 623003754907567206 [activityType@53163 activityType="5125"][activityId@53163 activityId="623003754907567206"][siteId@53163 siteId="623002780243591455"][siteName@53163 siteName="mlbcjy"][accountId@53163 accountId="622999338330619906"][accountName@53163 accountName="SentinelOne"][notificationScope@53163 notificationScope="SITE"][agentId@53163 agentId="623002794101572118"][threatId@53163 threatId="-"][comments@53163 comments="-"][userId@53163 userId="-"][data.uid@53163 data.uid="Serial"][data.creator@53163 data.creator="wsnbiw"][data.osType@53163 data.osType="windows"][data.ruleId@53163 data.ruleId="623003742878303298"][data.version@53163 data.version="N/A"][data.eventId@53163 data.eventId="492c9d6a-116d-4f0f-a04c-2c609e55f62c"][data.groupId@53163 data.groupId="623002780268757280"][data.interface@53163 data.interface="USB"][data.ruleName@53163 data.ruleName="avywou"][data.ruleType@53163 data.ruleType="productId"][data.vendorId@53163 data.vendorId="3"][data.eventTime@53163 data.eventTime="2014-11-09T11:14:33.900+03:00"][data.eventType@53163 data.eventType="blocked"][data.productId@53163 data.productId="38B83"][data.scopeName@53163 data.scopeName="mlbcjy (SentinelOne) Default Group"][data.deviceName@53163 data.deviceName="My device"][data.lmpVersion@53163 data.lmpVersion="N/A"][data.minorClass@53163 data.minorClass="N/A"][data.deviceClass@53163 data.deviceClass="01h"][data.computerName@53163 data.computerName="TEST-AGENT-WIN"][data.profileUuids@53163 data.profileUuids="N/A"][data.ruleScopeName@53163 data.ruleScopeName="site mlbcjy"][data.lastLoggedInUserName@53163 data.lastLoggedInUserName="testuser"][secondaryDescription@53163 secondaryDescription="-"][description@53163 description="-"][createdAt@53163 createdAt="2019-05-10T13:57:15.009733Z"][groupId@53163 groupId="623002780268757280"][agentUpdatedVersion@53163 agentUpdatedVersion="-"][hash@53163 hash="-"][osFamily@53163 osFamily="-"][siteId@53163 siteId="623002780243591455"][updatedAt@53163 updatedAt="2019-05-10T13:57:15.009738Z"] USB device My device was blocked on TEST-AGENT-WIN because of rule avywou in site mlbcjy.

Thank you!

I believe RFC-5424 is a syslog format. Have you tried creating a syslog input and sending the data into that directly from their syslog generator?

Also check this out:
https://rfc5424-logging-handler.readthedocs.io/en/latest/syslog_server.html

According to the documentation, RFC-5424 is not the format that Syslog input supports: This input only supports RFC3164 Syslog

Therefore, I tried the solution suggested here: Logstash and RFC5424 — RFC5424 logging handler 1.4.3 documentation", it seems like it parses the data, but the output has the "_grokparsefailure_sysloginput" tag.
I tried with TCP input (instead of Syslog that doesn't support certificates that are required in our environment) and it parses the data as well, but also keeps the "_grokparsefailure" tag.

Any ideas what is the issue?

1 Like

What does yout pipeline configuration looks like?

The tag _grokparsefailure means that some of your messages does not match your grok pattern, you need to fix your grok pattern or add other grok filters to deal with those messages.

I do not know SentinelOne, but can you change the format that the logs are shipped? Does it has the option to use CEF or something like that? The format of the message you shared can give you a lot of work to parse it correctly, if you have the option, change it to a better format.

You could try

    dissect { mapping => { "message" => "<%{syslog_pri}>%{} %{[@metadata][ts]} %{someIp} %{} %{} %{} %{allThatStuff}] %{restOfLine}" } }
    mutate { gsub => [ "allThatStuff", "[.\w]+@\d+", "" ] }
    mutate { gsub => [ "allThatStuff", "^\[", "" ] }
#    mutate { gsub => [ "allThatStuff", "data\.", "" ] }
    kv { source => "allThatStuff" field_split_pattern => "\]\[" trim_key => " " }

which results in

               "restOfLine" => "USB device My device was blocked on TEST-AGENT-WIN because of rule avywou in site mlbcjy.",
                  "agentId" => "623002794101572118",
           "data.interface" => "USB",
            "data.ruleName" => "avywou",
              "accountName" => "SentinelOne",
       "data.ruleScopeName" => "site mlbcjy",
                   "siteId" => [
    [0] "623002780243591455",
    [1] "623002780243591455"
],

etc.

Interesting, I have that inline ruby code one working happily to parse RFC5424 entries coming in via a straight TCP connection from rsyslog. I'd prefer to use RELP but haven't got around to that one yet and getting the internals all over to mTLS.

  tcp {
    id => "syslog-rfc5424-tcp"
    type => "syslog"
    port => 2514
    ecs_compatibility => "v1"

    tags => ["staging", "syslog", "rfc5424", "tcp"]
  }

and then in the filter we have

filter {
  # rewrite rfc5425
  if "rfc5424" in [tags] {
    grok {
      id => "grok_convert_rfc5424"
      match => {
        "message" => "<%{NONNEGINT:syslog_pri}>%{NONNEGINT:version}%{SPACE}(?:-|%{TIMESTAMP_ISO8601:syslog_timestamp})%{SPACE}(?:-|%{IPORHOST:hostname})%{SPACE}(?:%{SYSLOG5424PRINTASCII:program}|-)%{SPACE}(?:-|%{SYSLOG5424PRINTASCII:process_id})%{SPACE}(?:-|%{SYSLOG5424PRINTASCII:message_id})%{SPACE}(?:-|(?<structured_data>(\[.*?[^\\]\])+))(?:%{SPACE}%{GREEDYDATA:syslog_message}|)"
      }
      add_tag => [ "match" ]
    }
    if "match" in [tags] {
      syslog_pri {
        remove_field => "syslog_pri"
      }
      date {
        match => [ "syslog_timestamp", "ISO8601", "MMM dd HH:mm:ss", "MMM dd HH:mm:ss.SSS" ]
        remove_field => "syslog_timestamp"
      }
      if [structured_data] {
        ruby {
          id => "ruby_extract_structured_data_rfc5424"
          code => '
              # https://github.com/logstash-plugins/logstash-input-syslog/issues/15#issuecomment-270367033
              def extract_syslog5424_sd(syslog5424_sd)
                  sd = {}
                  syslog5424_sd.scan(/\[(?<element>.*?[^\\])\]/) do |element|
                      data = element[0].match(/(?<sd_id>[^\ ]+)(?<sd_params> .*)?/)
                      sd_id = data[:sd_id].split("@", 2)[0]
                      sd[sd_id] = {}
                      next if data.nil? || data[:sd_params].nil?
                      data[:sd_params].scan(/ (.*?[=](?:""|".*?[^\\]"))/) do |set|
                          set = set[0].match(/(?<param_name>.*?)[=]\"(?<param_value>.*)\"/)
                          sd[sd_id][set[:param_name]] = set[:param_value]
                      end
                  end
                  sd
              end
              event.set("[sd]", extract_syslog5424_sd(event.get("[structured_data]")))
          '
          remove_field => "structured_data"
        }
      }
    }
  }

then RSYSLOG is configured like this:

# RFC5424 format - RSYSLOG_SyslogProtocol23Format
*.* action(type="omfwd" target="syslog" Template="RSYSLOG_SyslogProtocol23Format" port="2514" protocol="tcp" action.resumeRetryCount="100" queue.type="linkedList" queue.size="10000")

I've also tried it via UDP and that worked fine as well.

I'm not sure what cause the issue but I just used the "match" tag in the output to prevent grok failures and it seems to work :slight_smile:

Thank you all!