Aggregate secure/sshd syslog event based on selected events

Hi I'm trying to filter out the login & logout events from linux ssh events send as syslog to Logstash and forward it to my firewall via syslog again. This setup is to allow my firewall to map the user-id to IP address in the traffic logs.

This are the raw logs and i'm only interested in the particular logs:
2023-03-16T17:26:27+08:00 poc-rsyslog-client sshd[8487]: Accepted password for test_user from 10.8.4.23 port 35900 ssh2
2023-03-16T17:26:27+08:00 poc-rsyslog-client sshd[8487]: pam_unix(sshd:session): session opened for user test_user by (uid=0)
2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Received disconnect from 10.8.4.23 port 35900:11: disconnected by user
2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Disconnected from 10.8.4.23 port 35900
2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: pam_unix(sshd:session): session closed for user test_user

For login event I need to refer to first log which contain the user-id (test_user) and ip address (10.8.4.23)
As for logout event, I need co-relate Forth & Fifth log which contain the user-id and ip address.

Below is my configuration:

input {
        syslog {
                        host => "10.8.4.20"
                        port => 55514
        }
}

filter {
        if "Accepted" in [message]{
                grok {
                        match => {
                        "message" => "%{DATA:event} %{DATA:srcmethod} for %{DATA:srcuser} from %{IPORHOST:srcip} port %{NUMBER:srcport} %{GREEDYDATA:signature}"
                        }
                }
        }
        else{
                if "Disconnected" in [message] or "close" in [message]{
                        aggregate {
                                 task_id => "%{syslog_pid}"
                                 code => '
                                         map["message"] ||= []
                                         map["message"] << event.get("message")
                                         event.cancel
                                 '
                                 push_map_as_event_on_timeout => true
                                 timeout => 10
                         }
                }
                else{ drop {} }
        }
}

output {
        syslog {
                        host => "10.9.9.2"
                        port => "514"
                        protocol => "udp"
        }
        syslog {
                        host => "10.9.9.3"
                        port => "514"
                        protocol => "udp"
        }
        stdout {}
}

and the result I got from stdout is as following:

{
         "event" => {
        "original" => "<86>Mar 22 20:44:35 localhost sshd[17161]: Accepted password for test_user from 10.8.4.23 port 47506 ssh2"
    },
       "srcuser" => "test_user",
    "@timestamp" => 2023-03-22T12:44:35.000Z,
          "host" => {
              "ip" => "10.8.246.200",
        "hostname" => "localhost"
    },
       "message" => "Accepted password for root from 10.8.4.23 port 47506 ssh2",
       "process" => {
         "pid" => 17161,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    },
       "srcport" => "47506",
         "srcip" => "10.8.4.23",
     "srcmethod" => "password",
      "@version" => "1",
           "log" => {
        "syslog" => {
            "priority" => 86,
            "severity" => {
                "name" => "Informational",
                "code" => 6
            },
            "facility" => {
                "name" => "security/authorization",
                "code" => 10
            }
        }
    },
     "signature" => "ssh2"
}
{
      "@version" => "1",
         "event" => {
        "original" => "<86>Mar 22 20:44:39 localhost sshd[17161]: Disconnected from 10.8.4.23 port 47506"
    },
           "log" => {
        "syslog" => {
            "priority" => 86,
            "severity" => {
                "name" => "Informational",
                "code" => 6
            },
            "facility" => {
                "name" => "security/authorization",
                "code" => 10
            }
        }
    },
    "@timestamp" => 2023-03-22T12:44:39.000Z,
          "host" => {
              "ip" => "10.8.246.200",
        "hostname" => "localhost"
    },
       "message" => "Disconnected from 10.8.4.23 port 47506",
       "process" => {
         "pid" => 17161,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    }
}
{
      "@version" => "1",
         "event" => {
        "original" => "<86>Mar 22 20:44:39 localhost sshd[17161]: pam_unix(sshd:session): session closed for user test_user"
    },
           "log" => {
        "syslog" => {
            "priority" => 86,
            "severity" => {
                "name" => "Informational",
                "code" => 6
            },
            "facility" => {
                "name" => "security/authorization",
                "code" => 10
            }
        }
    },
    "@timestamp" => 2023-03-22T12:44:39.000Z,
          "host" => {
              "ip" => "10.8.246.200",
        "hostname" => "localhost"
    },
       "message" => "pam_unix(sshd:session): session closed for user test_user",
       "process" => {
         "pid" => 17161,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    }
}

How do i achieve my second goal which is co-relate the last 2 output into single output?

You could try an aggregate filter. Something similar to example 1.

I tried, but the aggregate filter is not working at all. It also stdout event by event as shown in the output sample. Did I code it wrongly?

I ended up doing this using

   dissect { mapping => { "message" => "%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}" } }
    date { match => [ "[@metadata][ts]", "ISO8601" ] }
    if [@metadata][restOfLine] =~ /Accepted .* for \w+ from/ {
        aggregate {
            task_id => "%{[@metadata][program]}"
            map_action => "create"
            code => '
                begin
                    m = event.get("message").match(/Accepted (.*) for (\w+) from (.*) port (\d+) /)
                    map["startSessionTime"] = event.get("@timestamp")
                    map["authenticationMethod"] = m[1]
                    map["username"] = m[2]
                    map["sourceIp"] = m[3]
                    map["sourcePort"] = m[4]
                    map["id"] = event.get("[@metadata][program]")
                rescue
                end
            '
        }
    } else if [@metadata][restOfLine] =~ /session closed/ {
        aggregate {
            task_id => "%{[@metadata][program]}"
            code => 'map["endSessionTime"] = event.get("@timestamp")'
        }
        aggregate {
            task_id => "%{[@metadata][program]}"
            code => ''
            push_previous_map_as_event => true
            timeout => 5
        }
    } else if [@metadata][restOfLine] =~ /session opened/ {
        aggregate {
            task_id => "%{[@metadata][program]}"
            code => '
                begin
                    m = event.get("message").match(/session opened .* by \((.*)\)/)
                    map["sessionOpener"] = m[1]
                rescue
                end
            '
        }
   }

which will create additional events like

{
          "sourcePort" => "35900",
          "@timestamp" => 2023-03-22T19:55:25.329409622Z,
            "sourceIp" => "10.8.4.23",
"authenticationMethod" => "public key",
    "startSessionTime" => 2023-04-16T09:26:27.000Z,
            "@version" => "1",
                  "id" => "sshd[487]",
       "sessionOpener" => "uid=0",
      "endSessionTime" => 2023-04-16T09:27:27.000Z,
            "username" => "test_user"
}

The reason I use two aggregate filters to handle session closed is that when the code option executes due to push_previous_map_as_event the event is no longer available. The code is executed against the new event that is created.

Make sure you have pipeline.workers set to 1 and pipeline.ordered evaluates to true.

Thanks for the code.

However, the output is a bit different from what I expected. Instead of a single output, i actually require 2 output.

1st output: Login event referencing raw input - 2023-03-16T17:26:27+08:00 poc-rsyslog-client sshd[8487]: Accepted password for test_user from 10.8.4.23 port 35900 ssh2

2nd output: Logout event referencing raw input which require concatenation (lack of better word) - 2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Disconnected from 10.8.4.23 port 35900 + session closed for user test_user

This 2 separate output will register to my firewall such that they can keep track of the user so that it can release the user-id to ip address mapping in the session table upon logout event. P.S. the time of the event should be different, in my example i have login and logout immediately. I think we should match via the [sshd-id].

That is even simpler

dissect { mapping => { "message" => "%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}" } }
date { match => [ "[@metadata][ts]", "ISO8601" ] }
if [@metadata][restOfLine] =~ /Accepted .* for \w+ from/ {
    aggregate {
        task_id => "%{[@metadata][program]}"
        map_action => "create"
        code => '
            begin
                m = event.get("message").match(/Accepted .* for \w+ from (.*) port (\d+) /)
                map["sourceIp"] = m[1]
                map["sourcePort"] = m[2]
            rescue
            end
        '
    }
} else if [@metadata][restOfLine] =~ /session closed/ {
    aggregate {
        map_action => "update"
        end_of_task => true
        timeout => 120
        task_id => "%{[@metadata][program]}"
        code => '
            sourceIp =  map["sourceIp"]
            sourcePort = map["sourcePort"]
            event.set("message", event.get("message") + " from #{sourceIp} port #{sourcePort}")
        '
    }
}

Hi Badger,

I have tested your code but it is still stdout all the sshd event. I did a tcpdump on the syslog output and verified that it is sending every single event out too.

[LOGIN]

[2023-03-23T10:14:04,413][WARN ][org.logstash.dissect.Dissector][main][ff1085065ad71bde28735b0c5d3224586821c04377b8b0a1219761b2f8031380] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"@version"=>"1", "tags"=>["_dissectfailure"], "@timestamp"=>2023-03-23T02:14:02.000Z, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"Accepted password for root from 10.8.4.23 port 59752 ssh2", "event"=>{"original"=>"<86>Mar 23 10:14:02 localhost sshd[17502]: Accepted password for root from 10.8.4.23 port 59752 ssh2"}, "process"=>{"pid"=>17502, "name"=>"sshd"}, "service"=>{"type"=>"system"}, "log"=>{"syslog"=>{"priority"=>86, "facility"=>{"code"=>10, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}}}
{
      "@version" => "1",
          "tags" => [
        [0] "_dissectfailure"
    ],
    "@timestamp" => 2023-03-23T02:14:02.000Z,
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "message" => "Accepted password for root from 10.8.4.23 port 59752 ssh2",
         "event" => {
        "original" => "<86>Mar 23 10:14:02 localhost sshd[17502]: Accepted password for root from 10.8.4.23 port 59752 ssh2"
    },
       "process" => {
         "pid" => 17502,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    },
           "log" => {
        "syslog" => {
            "priority" => 86,
            "facility" => {
                "code" => 10,
                "name" => "security/authorization"
            },
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            }
        }
    }
}
[2023-03-23T10:14:04,593][WARN ][org.logstash.dissect.Dissector][main][ff1085065ad71bde28735b0c5d3224586821c04377b8b0a1219761b2f8031380] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"@version"=>"1", "tags"=>["_dissectfailure"], "@timestamp"=>2023-03-23T02:14:02.000Z, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"New session 69 of user root.", "event"=>{"original"=>"<38>Mar 23 10:14:02 localhost systemd-logind: New session 69 of user root."}, "process"=>{"name"=>"systemd-logind"}, "service"=>{"type"=>"system"}, "log"=>{"syslog"=>{"priority"=>38, "facility"=>{"code"=>4, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}}}
[2023-03-23T10:14:04,594][WARN ][org.logstash.dissect.Dissector][main][ff1085065ad71bde28735b0c5d3224586821c04377b8b0a1219761b2f8031380] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"@version"=>"1", "tags"=>["_dissectfailure"], "@timestamp"=>2023-03-23T02:14:02.000Z, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"Started Session 69 of user root.", "event"=>{"original"=>"<30>Mar 23 10:14:02 localhost systemd: Started Session 69 of user root."}, "process"=>{"name"=>"systemd"}, "service"=>{"type"=>"system"}, "log"=>{"syslog"=>{"priority"=>30, "facility"=>{"code"=>3, "name"=>"system"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}}}
[2023-03-23T10:14:04,594][WARN ][org.logstash.dissect.Dissector][main][ff1085065ad71bde28735b0c5d3224586821c04377b8b0a1219761b2f8031380] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"@version"=>"1", "tags"=>["_dissectfailure"], "@timestamp"=>2023-03-23T02:14:02.000Z, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"pam_unix(sshd:session): session opened for user root by (uid=0)", "event"=>{"original"=>"<86>Mar 23 10:14:02 localhost sshd[17502]: pam_unix(sshd:session): session opened for user root by (uid=0)"}, "process"=>{"pid"=>17502, "name"=>"sshd"}, "service"=>{"type"=>"system"}, "log"=>{"syslog"=>{"priority"=>86, "facility"=>{"code"=>10, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}}}
{
      "@version" => "1",
          "tags" => [
        [0] "_dissectfailure"
    ],
    "@timestamp" => 2023-03-23T02:14:02.000Z,
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "message" => "New session 69 of user root.",
         "event" => {
        "original" => "<38>Mar 23 10:14:02 localhost systemd-logind: New session 69 of user root."
    },
       "process" => {
        "name" => "systemd-logind"
    },
       "service" => {
        "type" => "system"
    },
           "log" => {
        "syslog" => {
            "priority" => 38,
            "facility" => {
                "code" => 4,
                "name" => "security/authorization"
            },
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            }
        }
    }
}
{
      "@version" => "1",
          "tags" => [
        [0] "_dissectfailure"
    ],
    "@timestamp" => 2023-03-23T02:14:02.000Z,
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "message" => "Started Session 69 of user root.",
         "event" => {
        "original" => "<30>Mar 23 10:14:02 localhost systemd: Started Session 69 of user root."
    },
       "process" => {
        "name" => "systemd"
    },
       "service" => {
        "type" => "system"
    },
           "log" => {
        "syslog" => {
            "priority" => 30,
            "facility" => {
                "code" => 3,
                "name" => "system"
            },
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            }
        }
    }
}
{
      "@version" => "1",
          "tags" => [
        [0] "_dissectfailure"
    ],
    "@timestamp" => 2023-03-23T02:14:02.000Z,
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "message" => "pam_unix(sshd:session): session opened for user root by (uid=0)",
         "event" => {
        "original" => "<86>Mar 23 10:14:02 localhost sshd[17502]: pam_unix(sshd:session): session opened for user root by (uid=0)"
    },
       "process" => {
         "pid" => 17502,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    },
           "log" => {
        "syslog" => {
            "priority" => 86,
            "facility" => {
                "code" => 10,
                "name" => "security/authorization"
            },
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            }
        }
    }
}

[LOGOUT]

[2023-03-23T10:14:24,935][WARN ][org.logstash.dissect.Dissector][main][ff1085065ad71bde28735b0c5d3224586821c04377b8b0a1219761b2f8031380] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"@version"=>"1", "tags"=>["_dissectfailure"], "@timestamp"=>2023-03-23T02:14:22.000Z, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"Disconnected from 10.8.4.23 port 59752", "event"=>{"original"=>"<86>Mar 23 10:14:22 localhost sshd[17502]: Disconnected from 10.8.4.23 port 59752"}, "process"=>{"pid"=>17502, "name"=>"sshd"}, "service"=>{"type"=>"system"}, "log"=>{"syslog"=>{"priority"=>86, "facility"=>{"code"=>10, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}}}
[2023-03-23T10:14:24,935][WARN ][org.logstash.dissect.Dissector][main][ff1085065ad71bde28735b0c5d3224586821c04377b8b0a1219761b2f8031380] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"@version"=>"1", "tags"=>["_dissectfailure"], "@timestamp"=>2023-03-23T02:14:22.000Z, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"pam_unix(sshd:session): session closed for user root", "event"=>{"original"=>"<86>Mar 23 10:14:22 localhost sshd[17502]: pam_unix(sshd:session): session closed for user root"}, "process"=>{"pid"=>17502, "name"=>"sshd"}, "service"=>{"type"=>"system"}, "log"=>{"syslog"=>{"priority"=>86, "facility"=>{"code"=>10, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}}}
[2023-03-23T10:14:24,937][WARN ][org.logstash.dissect.Dissector][main][ff1085065ad71bde28735b0c5d3224586821c04377b8b0a1219761b2f8031380] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"%{[@metadata][ts]} %{} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"@version"=>"1", "tags"=>["_dissectfailure"], "@timestamp"=>2023-03-23T02:14:22.000Z, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"Removed session 69.", "event"=>{"original"=>"<38>Mar 23 10:14:22 localhost systemd-logind: Removed session 69."}, "process"=>{"name"=>"systemd-logind"}, "service"=>{"type"=>"system"}, "log"=>{"syslog"=>{"priority"=>38, "facility"=>{"code"=>4, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}}}
{
      "@version" => "1",
          "tags" => [
        [0] "_dateparsefailure"
    ],
    "@timestamp" => 2023-03-23T02:14:22.000Z,
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "message" => "Received disconnect from 10.8.4.23 port 59752:11: disconnected by user",
         "event" => {
        "original" => "<86>Mar 23 10:14:22 localhost sshd[17502]: Received disconnect from 10.8.4.23 port 59752:11: disconnected by user"
    },
       "process" => {
         "pid" => 17502,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    },
           "log" => {
        "syslog" => {
            "priority" => 86,
            "facility" => {
                "code" => 10,
                "name" => "security/authorization"
            },
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            }
        }
    }
}
{
      "@version" => "1",
          "tags" => [
        [0] "_dissectfailure"
    ],
    "@timestamp" => 2023-03-23T02:14:22.000Z,
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "message" => "Disconnected from 10.8.4.23 port 59752",
         "event" => {
        "original" => "<86>Mar 23 10:14:22 localhost sshd[17502]: Disconnected from 10.8.4.23 port 59752"
    },
       "process" => {
         "pid" => 17502,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    },
           "log" => {
        "syslog" => {
            "priority" => 86,
            "facility" => {
                "code" => 10,
                "name" => "security/authorization"
            },
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            }
        }
    }
}
{
      "@version" => "1",
          "tags" => [
        [0] "_dissectfailure"
    ],
    "@timestamp" => 2023-03-23T02:14:22.000Z,
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "message" => "pam_unix(sshd:session): session closed for user root",
         "event" => {
        "original" => "<86>Mar 23 10:14:22 localhost sshd[17502]: pam_unix(sshd:session): session closed for user root"
    },
       "process" => {
         "pid" => 17502,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    },
           "log" => {
        "syslog" => {
            "priority" => 86,
            "facility" => {
                "code" => 10,
                "name" => "security/authorization"
            },
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            }
        }
    }
}
{
      "@version" => "1",
          "tags" => [
        [0] "_dissectfailure"
    ],
    "@timestamp" => 2023-03-23T02:14:22.000Z,
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "message" => "Removed session 69.",
         "event" => {
        "original" => "<38>Mar 23 10:14:22 localhost systemd-logind: Removed session 69."
    },
       "process" => {
        "name" => "systemd-logind"
    },
       "service" => {
        "type" => "system"
    },
           "log" => {
        "syslog" => {
            "priority" => 38,
            "facility" => {
                "code" => 4,
                "name" => "security/authorization"
            },
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            }
        }
    }
}

I'm sorry, I made the mistake of assuming that your messages had the format that you showed in your original question. You can change the dissect

dissect { mapping => { "message" => "<%{}>%{[@metadata][ts]} %{} %{[@metadata][program]}: ...

to fix the _dissectfailure issues.

Not sure what you mean by this. My code modifies the "session closed" message to include details from the "Accepted password for test_user" message. If you only want those two message kept you could change

if [@metadata][restOfLine] =~ /Accepted .* for \w+ from/ {
    ....
} else if [@metadata][restOfLine] =~ /session closed/ {
    ....
}

to

if [@metadata][restOfLine] =~ /Accepted .* for \w+ from/ {
    ....
} else if [@metadata][restOfLine] =~ /session closed/ {
    ....
} else {
    drop {}
}

I cannot see a use case where I would want that.

As mentioned in the initial question,

We only interested the input highlighted in bold.
023-03-16T17:26:27+08:00 poc-rsyslog-client sshd[8487]: Accepted password for test_user from 10.8.4.23 port 35900 ssh2
2023-03-16T17:26:27+08:00 poc-rsyslog-client sshd[8487]: pam_unix(sshd:session): session opened for user test_user by (uid=0)
2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Received disconnect from 10.8.4.23 port 35900:11: disconnected by user
2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Disconnected from 10.8.4.23 port 35900
2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: pam_unix(sshd:session): session closed for user test_user

This 3 logs help us to build login-event and logout event which needed to be sent to the firewall to build the user-ip & ip address mapping. However, the code you have help to build stdout all single event which should be filtered out before forwarding to the firewall.

Total should be only 2 outputs that I shall see:

  1. Accepted password for test_user from 10.8.4.23 port 35900 ssh2
  2. Disconnected from 10.8.4.23 port 35900 + session closed for user test_user (Concatenated from 2 different inputs.

with this each output, my firewall can do a regex to further filter the user-id (test_user) along with the ip address (10.8.4.23) for the login and logout event. this way, our firewall traffic log will able to display the ip address who which user-id is.

If you can explain the difference between the output from this answer and what you want I can attempt to address it tomorrow.

I see no reason to involve the second highlighted log entry (in your first log entry) when that data can be aggregated from the first to the third.

If for some reason you prefer to take the host and port from the "Disconnected from" message then just update my code to do so.

If you do not think "sshd[8487]" can be used as the task_id to aggregate entries then say what can be used.

Instead of aggregating first to the third logs, I need it to be [Output 1 (first)] & [Output 2 (second + third)] as 2 separate output.

So I should only see this 2 output, expected result:

[OUTPUT 1]

{
    "@timestamp" => 2023-03-23T03:42:42.000Z,
           "log" => {
        "syslog" => {
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            },
            "priority" => 86,
            "facility" => {
                "code" => 10,
                "name" => "security/authorization"
            }
        }
    },
      "@version" => "1",
       "message" => "Accepted password for root from 10.8.4.23 port 32874 ssh2",
         "event" => {
        "original" => "<86>Mar 23 11:42:42 localhost sshd[17565]: Accepted password for root from 10.8.4.23 port 32874 ssh2"
    },
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "process" => {
         "pid" => 17565,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    }
}

[OUTPUT 2]

{
    "@timestamp" => 2023-03-23T03:42:55.000Z,
           "log" => {
        "syslog" => {
            "severity" => {
                "code" => 6,
                "name" => "Informational"
            },
            "priority" => 86,
            "facility" => {
                "code" => 10,
                "name" => "security/authorization"
            }
        }
    },
      "@version" => "1",
       "message" => "Disconnected from 10.8.4.23 port 32874 + pam_unix(sshd:session): session closed for user root",
         "event" => {
        "original" => "<86>Mar 23 11:42:55 localhost sshd[17565]: Disconnected from 10.8.4.23 port 32874 + pam_unix(sshd:session): session closed for user root"
    },
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },"
    },
          "host" => {
        "hostname" => "localhost",
              "ip" => "10.8.246.200"
    },
       "process" => {
         "pid" => 17565,
        "name" => "sshd"
    },
       "service" => {
        "type" => "system"
    }
}

Take note of the + sign in output 2, it is a concatenated information from 2 events log.

On my firewall, below image is the syslog filter I would need to create for both login & logout event.

The main issue here is that Logstash is event based, each event is independent from each other, if you need for some reason to correlate two or more events while indexing then things start to get tricky.

You basically only have the aggregate filter to combine multiple events, but even the aggregate filter has some limitations like the need to have a common value to be used as an id to aggregate.

In your second case you dont have anything between the two messages that could be used as an id to aggregate.

2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Disconnected from 10.8.4.23 port 35900
2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: pam_unix(sshd:session): session closed for user test_user

There is nothing on these two lines that could be used to correlate the events, and you also cannot aggregate the login message and the logout message because they can be a long time apart from each other.

I don't think you will be able to do what you want using the aggregate filter.

However, you may achieve your goal if you use memcached as an auxiliary tool and the memcached filter.

You would use memcached filter to store the ip, port and user for each login event, and when you have a disconnect event you would again use the memcached filter to get the information about the user with that ip and port.

For example, for the login event below:

2023-03-16T17:26:27+08:00 poc-rsyslog-client sshd[8487]: Accepted password for test_user from 10.8.4.23 port 35900 ssh2

You would parse it and create a field combining the IP and Port, you would then use this value as the key for an entry in the memcached with the user as the value of this key, something like this:

"10.8.4.23-35900": "test_user"

Then, when you got the disconnect event below:

2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Disconnected from 10.8.4.23 port 35900

You would use the memcached filter and query for the key 10.8.4.23-35900 to get the user associated with this IP and port pair and add it to your message.

With this you can create the messages as you want to send to your syslog and drop everything else if it is not needed.

Hi leandrojmp,

I understand that the login and logout event could be long time apart. But one thing for sure is that these 2 logs are send instantaneously during the logout event.

2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Disconnected from 10.8.4.23 port 35900
2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: pam_unix(sshd:session): session closed for user test_user

Not sure why, but linux ssh will send 2 different logs for the logout event as compared to login event.

What if we could use the sshd[id] as the common identifier?

Yeah, looking with more attention I think that you are right, this is the process name and the process id, but sshd will spawn a new process for every connection so this can be used as the id.

In this case you can use the aggregate filter, but you just need to aggregate the events after the Disconnect from.

This answer from @Badger can help you try to build the aggregate filter, you just need to change it to consider the Disconnect from message.

Remember that the aggregate filter only works if you are running Logstash with just 1 work.

I've made a small adaptation on @Badger code and this should work.

filter {
    dissect { 
        mapping => { 
            "message" => "%{[@metadata][ts]} %{[@metadata][server]} %{[@metadata][program]}: %{[@metadata][restOfLine]}" 
        } 
    }
    date { 
        match => [ "[@metadata][ts]", "ISO8601" ] 
    }
    if [@metadata][restOfLine] =~ /Disconnected from/ {
        aggregate {
            task_id => "%{[@metadata][program]}"
            map_action => "create"
            code => '
                begin
                    m = event.get("message").match(/Disconnected from (.*) port (\d+)/)
                    map["sourceIp"] = m[1]
                    map["sourcePort"] = m[2]
                rescue
                end
            '
        }
    } else if [@metadata][restOfLine] =~ /session closed/ {
        aggregate {
            map_action => "update"
            end_of_task => true
            timeout => 120
            task_id => "%{[@metadata][program]}"
            code => '
                sourceIp =  map["sourceIp"]
                sourcePort = map["sourcePort"]
                prefixMessage = event.get("[@metadata][ts]") + " " + event.get("[@metadata][server]") + " " + event.get("[@metadata][program]")
                finalMessage = prefixMessage + ": " + "Disconnected from #{sourceIp} port #{sourcePort} + " + event.get("[@metadata][restOfLine]")
                event.set("message", "#{finalMessage}" )
            '
        }
    }
}

This will start the aggregation when it receives a Disconnect from message and update it when it receives a session closed message.

The main change in the code is that the final message needs to be rebuild from the original fields.

This was the result a I got from logstash:

{
      "@version" => "1",
       "message" => "2023-03-16T17:26:27+08:00 poc-rsyslog-client sshd[8487]: Accepted password for test_user from 10.8.4.23 port 35900 ssh2",
    "@timestamp" => 2023-03-16T09:26:27.000Z,
          "host" => "lab"
}
{
      "@version" => "1",
       "message" => "2023-03-16T17:26:27+08:00 poc-rsyslog-client sshd[8487]: pam_unix(sshd:session): session opened for user test_user by (uid=0)",
    "@timestamp" => 2023-03-16T09:26:27.000Z,
          "host" => "lab"
}
{
      "@version" => "1",
       "message" => "2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Received disconnect from 10.8.4.23 port 35900:11: disconnected by user",
    "@timestamp" => 2023-03-16T09:27:27.000Z,
          "host" => "lab"
}
{
      "@version" => "1",
       "message" => "2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Disconnected from 10.8.4.23 port 35900",
    "@timestamp" => 2023-03-16T09:27:27.000Z,
          "host" => "lab"
}
{
      "@version" => "1",
       "message" => "2023-03-16T17:27:27+08:00 poc-rsyslog-client sshd[8487]: Disconnected from 10.8.4.23 port 35900 + pam_unix(sshd:session): session closed for user test_user",
    "@timestamp" => 2023-03-16T09:27:27.000Z,
          "host" => "lab"
}

Thanks for that.

Any idea why the dissect keep failing after I have made some changes to the config?

         mapping => {
            "message" => "<%{syslogPri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} %{[@metadata][server]} %{[@metadata][program]}: %{[@metadata][restOfLine]}"
                }
        }

    date { match => [ "[@metadata][ts]", "MMM dd HH:mm:ss"] }

this is the original message

<86>Mar 23 13:39:05 localhost sshd[17642]: Accepted password for root from 10.8.4.23 port 34526 ssh2

This is the error received in the output

[2023-03-23T14:25:52,594][WARN ][org.logstash.dissect.Dissector][main][380633d97cd065f1717caec28ce926938f4efe6bd2592e28f71563b4727cea35] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"<%{syslogPri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} %{[@metadata][server]} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"event"=>{"original"=>"<86>Mar 23 14:25:50 localhost sshd[17732]: Received disconnect from 10.8.4.23 port 35262:11: disconnected by user"}, "log"=>{"syslog"=>{"priority"=>86, "facility"=>{"code"=>10, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}, "process"=>{"pid"=>17732, "name"=>"sshd"}, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"Received disconnect from 10.8.4.23 port 35262:11: disconnected by user", "@timestamp"=>2023-03-23T06:25:50.000Z, "@version"=>"1", "service"=>{"type"=>"system"}, "tags"=>["_dissectfailure"]}}
[2023-03-23T14:25:52,595][WARN ][org.logstash.dissect.Dissector][main][380633d97cd065f1717caec28ce926938f4efe6bd2592e28f71563b4727cea35] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"<%{syslogPri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} %{[@metadata][server]} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"event"=>{"original"=>"<86>Mar 23 14:25:50 localhost sshd[17732]: Disconnected from 10.8.4.23 port 35262"}, "log"=>{"syslog"=>{"priority"=>86, "facility"=>{"code"=>10, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}, "process"=>{"pid"=>17732, "name"=>"sshd"}, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"Disconnected from 10.8.4.23 port 35262", "@timestamp"=>2023-03-23T06:25:50.000Z, "@version"=>"1", "service"=>{"type"=>"system"}, "tags"=>["_dissectfailure"]}}
[2023-03-23T14:25:52,596][WARN ][org.logstash.dissect.Dissector][main][380633d97cd065f1717caec28ce926938f4efe6bd2592e28f71563b4727cea35] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"<%{syslogPri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} %{[@metadata][server]} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"event"=>{"original"=>"<86>Mar 23 14:25:50 localhost sshd[17732]: pam_unix(sshd:session): session closed for user root"}, "log"=>{"syslog"=>{"priority"=>86, "facility"=>{"code"=>10, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}, "process"=>{"pid"=>17732, "name"=>"sshd"}, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"pam_unix(sshd:session): session closed for user root", "@timestamp"=>2023-03-23T06:25:50.000Z, "@version"=>"1", "service"=>{"type"=>"system"}, "tags"=>["_dissectfailure"]}}
[2023-03-23T14:25:52,597][WARN ][org.logstash.dissect.Dissector][main][380633d97cd065f1717caec28ce926938f4efe6bd2592e28f71563b4727cea35] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"<%{syslogPri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} %{[@metadata][server]} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"event"=>{"original"=>"<38>Mar 23 14:25:50 localhost systemd-logind: Removed session 81."}, "log"=>{"syslog"=>{"priority"=>38, "facility"=>{"code"=>4, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}, "process"=>{"name"=>"systemd-logind"}, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"Removed session 81.", "@timestamp"=>2023-03-23T06:25:50.000Z, "@version"=>"1", "service"=>{"type"=>"system"}, "tags"=>["_dissectfailure"]}}

You are using the syslog input, this input already have a grok parse to extract some fields as you can see in your log error, the content of the message field when the event arrives in the dissect filter is not the original message you shared.

For example:

[2023-03-23T14:25:52,596][WARN ][org.logstash.dissect.Dissector][main][380633d97cd065f1717caec28ce926938f4efe6bd2592e28f71563b4727cea35] Dissector mapping, pattern not found {"field"=>"message", "pattern"=>"<%{syslogPri}>%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} %{[@metadata][server]} %{[@metadata][program]}: %{[@metadata][restOfLine]}", "event"=>{"event"=>{"original"=>"<86>Mar 23 14:25:50 localhost sshd[17732]: pam_unix(sshd:session): session closed for user root"}, "log"=>{"syslog"=>{"priority"=>86, "facility"=>{"code"=>10, "name"=>"security/authorization"}, "severity"=>{"code"=>6, "name"=>"Informational"}}}, "process"=>{"pid"=>17732, "name"=>"sshd"}, "host"=>{"hostname"=>"localhost", "ip"=>"10.8.246.200"}, "message"=>"pam_unix(sshd:session): session closed for user root", "@timestamp"=>2023-03-23T06:25:50.000Z, "@version"=>"1", "service"=>{"type"=>"system"}, "tags"=>["_dissectfailure"]}}

The value of your message is just:

 "message"=>"pam_unix(sshd:session): session closed for user root"

You will probably need to use the tcp or udp input instead of the syslog input.

Try to change your syslog input for this:

input {
    tcp {
        host => "10.8.4.20"
        port => 55514
    }
    udp {
        host => "10.8.4.20"
        port => 55514
    }
}

Thanks for highlighting that.

I'm getting there, however I do face some issue with the "_dateparsefailure".

{
          "host" => {
        "ip" => "10.8.246.200"
    },
       "message" => "<86>Mar 24 12:13:13 localhost sshd[19655]: Accepted password for dcn from 10.8.4.23 port 55064 ssh2",
         "event" => {
        "original" => "<86>Mar 24 12:13:13 localhost sshd[19655]: Accepted password for dcn from 10.8.4.23 port 55064 ssh2"
    },
    "@timestamp" => 2023-03-24T04:13:16.145970385Z,
          "tags" => [
        [0] "_dateparsefailure"
    ],
      "@version" => "1"
}
{
          "host" => {
        "ip" => "10.8.246.200"
    },
       "message" => "<86>Mar 24 12:13:16 localhost sshd[19655]: Disconnected from 10.8.4.23 port 55064 + pam_unix(sshd:session): session closed for user dcn",
         "event" => {
        "original" => "<86>Mar 24 12:13:16 localhost sshd[19655]: pam_unix(sshd:session): session closed for user dcn"
    },
    "@timestamp" => 2023-03-24T04:13:19.238356276Z,
          "tags" => [
        [0] "_dateparsefailure"
    ],
      "@version" => "1"
}

Since the sshd[id] is the same across the first and last syslog message, I did some changes to the code such that it will only send 2 output instead of 3. here the updated code.

filter {
        dissect {
                mapping => { "message" => "%{[@metadata][ts]} %{[@metadata][server]} %{[@metadata][program]}: %{[@metadata][restOfLine]}" }
        }
    date { match => [ "[@metadata][ts]", "ISO8601" ] }
    if [@metadata][restOfLine] =~ /Accepted password/ {
                aggregate {
            task_id => "%{[@metadata][program]}"
            map_action => "create"
            code => '
                begin
                    m = event.get("message").match(/Accepted password for (\w+) from (.*) port (\d+)/)
                    map["sourceUserid"] = m[1]
                    map["sourceIp"] = m[2]
                    map["sourcePort"] = m[3]
                rescue
                end
            '
        }
    } else if [@metadata][restOfLine] =~ /session closed/ {
        aggregate {
            map_action => "update"
            end_of_task => true
            timeout => 120
            task_id => "%{[@metadata][program]}"
            code => '
                sourceUserid = map["sourceUserid"]
                sourceIp =  map["sourceIp"]
                sourcePort = map["sourcePort"]
                prefixMessage = event.get("[@metadata][ts]") + " " + event.get("[@metadata][server]") + " " + event.get("[@metadata][program]")
                finalMessage = prefixMessage + ": " + "Disconnected from #{sourceIp} port #{sourcePort} + " + event.get("[@metadata][restOfLine]")
                event.set("message", "#{finalMessage}" )
            '
        }
    } else { drop{} }
}

I manage to store a variable for the source Userid but it wasn't utilize in the final message. It will be good if I can use it to ensure that the sourceUserid is indeed in the [@metadata][restOfLine] at the else if statement. But I not sure if it can be done so.

@leandrojmp Any idea if we can use the variable in a if-else statement?

" I manage to store a variable for the source Userid but it has not been utilized in the final message. It will be good if I can use it to ensure that the sourceUserid is indeed in the [@metadata][restOfLine] at the else if statement. But I not sure if it can be done so"

I'm not sure I understand what you want to do now or what is your issue, can you provide a little more context?

Did you check the documentation on how to access data in logstash?

If I understood correctly you will need something like this:

if [field]  in [anotherfield]