Aggregate secure/sshd syslog event based on selected events

That is even simpler

dissect { mapping => { "message" => "%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}" } }
date { match => [ "[@metadata][ts]", "ISO8601" ] }
if [@metadata][restOfLine] =~ /Accepted .* for \w+ from/ {
    aggregate {
        task_id => "%{[@metadata][program]}"
        map_action => "create"
        code => '
            begin
                m = event.get("message").match(/Accepted .* for \w+ from (.*) port (\d+) /)
                map["sourceIp"] = m[1]
                map["sourcePort"] = m[2]
            rescue
            end
        '
    }
} else if [@metadata][restOfLine] =~ /session closed/ {
    aggregate {
        map_action => "update"
        end_of_task => true
        timeout => 120
        task_id => "%{[@metadata][program]}"
        code => '
            sourceIp =  map["sourceIp"]
            sourcePort = map["sourcePort"]
            event.set("message", event.get("message") + " from #{sourceIp} port #{sourcePort}")
        '
    }
}