Parsing PFSense Proxy logs in Syslog

Good morning everyone,

I recently deployed a PFSense box and enabled a Squid Proxy. I am shipping those logs to my ELK server to process and display in Kibana. I currently have filebeat running on my stack and have the configuration that is recommended on elastics site. That being said, I see the logs come in but the url is not being parsed out to a field other than message which does not allow me to search on it specifically.

If anyone has done something similar or ran into the same problem could you help me through this?

Thank you!

You want to parse Squid logs? Which ones? If it is the cache log then this blog might be helpful. If it is the access log then it is really going to depend on what format Squid is configured to use. Can you show us a couple of lines of the log file and say what each field is?

BTW, if you decide you want to parse PFSense logs as well then this might help you.

Good afternoon Badger and thank you for responding,

I am looking to parse the access logs.

Can you show us a couple of lines of the log file and say what each field is? We cannot help if we do not know what format you are trying to parse.

Sure thing. This was pulled from /var/log/syslog:

Oct  2 10:25:49 _gateway (squid-1): 1601648749.064 181691 192.168.1.200 TCP_TUNNEL/200 4830 CONNECT googleads.g.doubleclick.net:443 - HIER_DIRECT/2607:f8b0:4009:806::2002 -
Oct  2 10:25:49 _gateway (squid-1): 1601648749.065 182164 192.168.1.200 TCP_TUNNEL/200 5970 CONNECT adservice.google.com:443 - HIER_DIRECT/2607:f8b0:4009:806::2002 -
Oct  2 10:25:53 _gateway (squid-1): 1601648753.080 182049 192.168.1.200 TCP_TUNNEL/200 37408 CONNECT ogs.google.com:443 - HIER_DIRECT/2607:f8b0:4009:812::200e -
Oct  2 10:25:54 _gateway (squid-1): 1601648754.086 182194 192.168.1.200 TCP_TUNNEL/200 6559 CONNECT play.google.com:443 - HIER_DIRECT/2607:f8b0:4009:811::200e -

Please let me know if they are enough.

I would use dissect to parse that

    mutate { gsub => [ "message", "\s+", " " ] }
    dissect { mapping => { "message" => "%{[@metadata][ts]} %{+[@metadata][ts]} %{+[@metadata][ts]} %{hostname} (%{proxyname}): %{anotherTimestamp} %{number1} %{srcIp} %{method}/%{status} %{number2} %{method} %{dst} %{} %{something} %{}" } }
    date { match => [ "[@metadata][ts]", "MMM dd HH:mm:ss" ] }
    date { match => [ "anotherTimestamp", "UNIX" ] target => "anotherTimestamp" }
    mutate { convert => { "number1" => "integer" "number2" => "integer" } }

That will get you events like

         "number1" => 182194,
      "@timestamp" => 2020-10-02T14:25:54.000Z,
             "dst" => "play.google.com:443",
        "hostname" => "_gateway",
       "something" => "HIER_DIRECT/2607:f8b0:4009:811::200e",
         "number2" => 6559,
       "proxyname" => "squid-1",
"anotherTimestamp" => 2020-10-02T14:25:54.086Z,
           "srcIp" => "192.168.1.200",
          "method" => "CONNECT",
          "status" => "200"

The mutate+gsub is to avoid this issue.

You did not explain what the fields were, so I guessed or gave them arbitrary names.

If you want to split the [something] field like I did with method/status then you can do so. If you want to retain the fields that are dashes then name the fields instead of just using %{} in the dissect.

If your dates do not have a year in them then logstash will guess, and sometimes it will guess wrong. See issues 137, 100, and the long discussion of 51.

Thank you again Badger, and your fields are accurate! Now where would I insert this into the logstash configuration to get effects that I want?

My current syslog configuration in /logstash/conf.d/10-syslog-filter.conf

filter {
  if [fileset][module] == "system" {
    if [fileset][name] == "auth" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
        pattern_definitions => {
          "GREEDYMULTILINE"=> "(.|\n)*"
        }
        remove_field => "message"
      }
      date {
        match => [ "[system][auth][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
      geoip {
        source => "[system][auth][ssh][ip]"
        target => "[system][auth][ssh][geoip]"
      }
    }
    else if [fileset][name] == "syslog" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
        remove_field => "message"
      }
      date {
        match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
    }
  }
}

If your current syslog processing deals with that I would remove them from the dissect. In your 'else if [fileset][name] == "syslog"' branch perhaps

if "squid" in [system][syslog][program] {
    mutate { gsub => [ "[system][syslog][message]", "\s+", " " ] }
    dissect { mapping => { "[system][syslog][message]" => "%{anotherTimestamp} %{number1} %{srcIp} %{method}/%{status} %{number2} %{method} %{dst} %{} %{something} %{}" } }
    ...