ECS category & type vs Grok

Hi! This seems like it should be a simple question but I'm having trouble finding relevant documentation, so I appreciate any advice y'all might have.

I'm trying to write filters to handle OpenSSH log data, and I'd like to map it all to ECS.

The problem, I think, shows up immediately: Each SSH log even has a different event.category and event.type.

Given grok patterns like:

SSHD_SUCCESS        (?<sshd.result>Accepted) %{WORD:sshd.auth_type} for %{USERNAME:[client][user][name]} from %{IP:[client][ip]} port %{NUMBER:[client][port]} %{WORD:sshd.protocol}: %{GREEDYDATA:sshd.cipher}

...

SSHD_NORMAL_LOG %{SSHD_SUCCESS}|%{SSHD_OTHER}|%{SSHD_MORE}
SSHD_LOG %{SSHD_NORMAL_LOG}|%{SSHD_OTHER_CATEGORY}|%{SSHD_ETC}

How do I ensure that an event that matched SSHD_SUCCESS gets identified as authentication & start?

The only solution I can see is to write filters that specifically match on %{SSHD_SUCCESS} instead of %{SSHD_LOG}, and then set categorization fields and the like that way, but this seems like a lot of trouble.

You may use a conditional later to create the fields.

if [sshd][result] == "Accepted" {
    mutate {
        add_field => {
            "[event][category]" => "authentication"
            "[event][type]" => "start"
        }
    }
}

Also, in logstash you should use [sshd][result] and not sshd.result.

yees. There are about fifty distinct events. I'd need fifty such conditionals?

(I mean, I agree that this seems like it would work, but it seems extremely unwieldy and for that reason I'm concerned about maintainability.)

You may not need fifty conditionals, but this depends on how are your events, what you want to categorize, which field you are going to use, what are the values.

Another way would be using a translate filter paired with a json filter.

For example, if you will categorize based on the value of the field sshd.result and this field can have the following values:

  • accepted
  • denied
  • unknown

You could create the following yml dictionary.

"accept": '{"category": "authentication", "kind": "event", "type": "start", "outcome": "success"}'
"denied": '{"category": "authentication", "kind": "event", "type": "start", "outcome": "failure"}'
"unknown": '{"category": "authentication", "kind": "event", "type": "start", "outcome": "unknown"}'

Then you would configure the following translate filter:

translate {
    source => "[sshd][result]"
    target => "[@metadata][events]"
    dictionary_path => "/path/to/the/dict/file.yml"
    refresh_interval => 300
}

In case of a matching key it will create the field [@metadata][events] with the value of the key in the dictionary.

For sshd.result = accepted , it will create @metadata.events = {"category": "authentication", "kind": "event", "type": "start", "outcome": "success"}

Now you can use a json filter to parse the @metadata.events field into the event field.

json {
    source => "[@metadata][events]"
    target => "event"
}

You can use as many translate filters you want and with the advantage of making it easier to mantain, since it will refresh based on the value of refresh_interval, any changes in the dictionary will be applied.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.