Aggregate secure/sshd syslog event based on selected events

Thanks for highlighting that.

I'm getting there, however I do face some issue with the "_dateparsefailure".

{
          "host" => {
        "ip" => "10.8.246.200"
    },
       "message" => "<86>Mar 24 12:13:13 localhost sshd[19655]: Accepted password for dcn from 10.8.4.23 port 55064 ssh2",
         "event" => {
        "original" => "<86>Mar 24 12:13:13 localhost sshd[19655]: Accepted password for dcn from 10.8.4.23 port 55064 ssh2"
    },
    "@timestamp" => 2023-03-24T04:13:16.145970385Z,
          "tags" => [
        [0] "_dateparsefailure"
    ],
      "@version" => "1"
}
{
          "host" => {
        "ip" => "10.8.246.200"
    },
       "message" => "<86>Mar 24 12:13:16 localhost sshd[19655]: Disconnected from 10.8.4.23 port 55064 + pam_unix(sshd:session): session closed for user dcn",
         "event" => {
        "original" => "<86>Mar 24 12:13:16 localhost sshd[19655]: pam_unix(sshd:session): session closed for user dcn"
    },
    "@timestamp" => 2023-03-24T04:13:19.238356276Z,
          "tags" => [
        [0] "_dateparsefailure"
    ],
      "@version" => "1"
}

Since the sshd[id] is the same across the first and last syslog message, I did some changes to the code such that it will only send 2 output instead of 3. here the updated code.

filter {
        dissect {
                mapping => { "message" => "%{[@metadata][ts]} %{[@metadata][server]} %{[@metadata][program]}: %{[@metadata][restOfLine]}" }
        }
    date { match => [ "[@metadata][ts]", "ISO8601" ] }
    if [@metadata][restOfLine] =~ /Accepted password/ {
                aggregate {
            task_id => "%{[@metadata][program]}"
            map_action => "create"
            code => '
                begin
                    m = event.get("message").match(/Accepted password for (\w+) from (.*) port (\d+)/)
                    map["sourceUserid"] = m[1]
                    map["sourceIp"] = m[2]
                    map["sourcePort"] = m[3]
                rescue
                end
            '
        }
    } else if [@metadata][restOfLine] =~ /session closed/ {
        aggregate {
            map_action => "update"
            end_of_task => true
            timeout => 120
            task_id => "%{[@metadata][program]}"
            code => '
                sourceUserid = map["sourceUserid"]
                sourceIp =  map["sourceIp"]
                sourcePort = map["sourcePort"]
                prefixMessage = event.get("[@metadata][ts]") + " " + event.get("[@metadata][server]") + " " + event.get("[@metadata][program]")
                finalMessage = prefixMessage + ": " + "Disconnected from #{sourceIp} port #{sourcePort} + " + event.get("[@metadata][restOfLine]")
                event.set("message", "#{finalMessage}" )
            '
        }
    } else { drop{} }
}

I manage to store a variable for the source Userid but it wasn't utilize in the final message. It will be good if I can use it to ensure that the sourceUserid is indeed in the [@metadata][restOfLine] at the else if statement. But I not sure if it can be done so.