That is even simpler
dissect { mapping => { "message" => "%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}" } }
date { match => [ "[@metadata][ts]", "ISO8601" ] }
if [@metadata][restOfLine] =~ /Accepted .* for \w+ from/ {
aggregate {
task_id => "%{[@metadata][program]}"
map_action => "create"
code => '
begin
m = event.get("message").match(/Accepted .* for \w+ from (.*) port (\d+) /)
map["sourceIp"] = m[1]
map["sourcePort"] = m[2]
rescue
end
'
}
} else if [@metadata][restOfLine] =~ /session closed/ {
aggregate {
map_action => "update"
end_of_task => true
timeout => 120
task_id => "%{[@metadata][program]}"
code => '
sourceIp = map["sourceIp"]
sourcePort = map["sourcePort"]
event.set("message", event.get("message") + " from #{sourceIp} port #{sourcePort}")
'
}
}