Multiple matches required

I am currently setting up some filters for my incoming Watchguard Firewall logs. The logs come in various different formats so i have to setup multiple match rules.

My current filter is

filter {
#Watchguard logs filter
    if ([type] and [type] == "syslog") and ![dataType] {
      if ([message]) {
        grok {
            match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NUMBER:packet_length:int} %{NOTSPACE:protocol} %{NUMBER:metric:int} %{NUMBER:ttl:int} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} %{WORD} %{INT} %{WORD} %{INT} %{WORD} %{INT} src_user=\"%{DATA:src_user}\" dst_user=\"%{DATA:dst_user}\" %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NUMBER:packet_length:int} %{NOTSPACE:protocol} %{NUMBER:metric:int} %{NUMBER:ttl:int} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} %{WORD} %{INT} %{WORD} %{INT} %{WORD} %{INT} src_user=\"%{DATA:src_user}\" %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NUMBER:packet_length:int} %{NOTSPACE:protocol} %{NUMBER:metric:int} %{NUMBER:ttl:int} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} geo_dst=\"%{WORD:geo_dst}\" %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NUMBER:packet_length:int} %{NOTSPACE:protocol} %{NUMBER:metric:int} %{NUMBER:ttl:int} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} %{WORD} %{INT} %{WORD} %{INT} %{WORD} %{INT} geo_dst=\"%{WORD:geo_dst}\" src_user=\"%{DATA:src_user}\" %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NUMBER:packet_length:int} %{NOTSPACE:protocol} %{NUMBER:metric:int} %{NUMBER:ttl:int} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} %{WORD} %{INT} %{WORD} %{INT} %{WORD} %{INT} geo_src=\"%{WORD:geo_src}\" geo_dst=\"%{WORD:geo_dst}\" %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NUMBER:packet_length:int} %{NOTSPACE:protocol} %{NUMBER:metric:int} %{NUMBER:ttl:int} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} src_user=\"%{DATA:src_user}\" %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NUMBER:packet_length:int} %{NOTSPACE:protocol} %{NUMBER:metric:int} %{NUMBER:ttl:int} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} %{WORD} %{INT} %{WORD} %{INT} %{WORD} %{INT} %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NUMBER:packet_length:int} %{NOTSPACE:protocol} %{NUMBER:metric:int} %{NUMBER:ttl:int} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} %{WORD} %{INT} %{WORD} %{INT} %{WORD} %{INT} geo_dst=\"%{WORD:geo_dst}\" geo="%{DATA}" msg=\"%{DATA:msg}\" src_user=\"%{DATA:src_user}\" %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NUMBER:packet_length:int} %{NOTSPACE:protocol} %{NUMBER:metric:int} %{NUMBER:ttl:int} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} %{WORD} %{INT} %{WORD} %{INT} %{WORD} %{INT} dst_user=\"%{DATA:dst_user}\" %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NOTSPACE:protocol} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} msg=\"%{DATA:process}\" proxy_act=\"%{DATA:proxy_action}\" op=\"%{DATA}\" dstname=\"%{DATA:dst_name}\" arg=\"%{DATA}\" sent_bytes=\"%{DATA:sent_bytes}\" rcvd_bytes=\"%{DATA:received_bytes}\" elapsed_time=\"%{DATA}\" reputation=\"%{DATA}\" geo_dst=\"%{WORD:geo_dst}\" src_user=\"%{DATA:src_user}\" %{GREEDYDATA:rule_name}'
                ]
            }
			match => {
                "message" => [
                  '<%{INT:syslog_pri}>%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:serial_number} \(%{TIMESTAMP_ISO8601:timestamp}\) %{NOTSPACE:process} msg_id="%{DATA:msg_id}" %{WORD:action} %{NOTSPACE:src_network} %{NOTSPACE:dst_network} %{NOTSPACE:protocol} %{IP:source_ip} %{IP:destination_ip} %{NUMBER:source_port:int} %{NUMBER:destination_port:int} msg=\"%{DATA:process}\" proxy_act=\"%{DATA:proxy_action}\" tls_profile=\"%{DATA:tls_profile}\" tls_version=\"%{DATA:tls_version}\" sni=\"%{DATA:dst_address}\" cn=\"%{DATA}\" cert_issuer=\"%{DATA}\" cert_subject=\"%{DATA}\" action=\"%{DATA:action}\" app_id=\"%{DATA}\" app_cat_id=\"%{DATA}\" sig_vers=\"%{DATA}\" sent_bytes=\"%{DATA:sent_bytes}\" rcvd_bytes=\"%{DATA:rcvd_bytes}\" geo_dst=\"%{WORD:geo_dst}\" src_user=\"%{DATA:src_user}\" %{GREEDYDATA:rule_name}'
                ]
            }
        }
      }
    }
    if [serial_number]{
        mutate {
            remove_field => [ "message" ]
            add_field => {"[dataType]" => "watchguard-firewall"}
			add_field => { "dataSource" => "%{hostname}" }
        }
    }
}

A couple of issues i have is.

1.) Rule_name is always the last bit of data and is in the format of (RULE-NAME) but if i change %{GREEDYDATA:rule_name} to (%{DATA:rule_name}) it never matches even though if i run it through an online debugger it works ok.

2.) because im using %{GREEDYDATA:rule_name} sometimes it is matching an incorrect rule so the field rule_name contains more than it should. For example in some cases it maches rule_name as USER@NAME & RULE-NAME even though there is a match that should capture this. Are the matches processed in order and if it maches one it will no longer try to match another and if so in which order.

Example of logs:

<140>May 11 11:10:42 FIREWALL-NAME FIREWALLSERIALNO (2020-01-1T00:00:00) firewall: msg_id="3000-0148" Allow SRCNETWORK DSTNETWORK 52 tcp 20 127 SRCIP DSTIP 60354 443 offset 8 S 4292267136 win 61690 geo_dst="USA" src_user="USER@NAME" (RULE-NAME)

<140>May 11 11:33:27 FIREWALL-NAME FIREWALLSERIALNO (2020-01-1T00:05:00) firewall: msg_id="3000-0148" Allow SRCNETWORK DSTNETWORK 52 tcp 20 126 SRCIP DSTIP 60503 9101 offset 8 S 895650903 win 61690 src_user="USER@NAME" (RULE-NAME)

<142>May 11 11:34:16 FIREWALL-NAME FIREWALLSERIALNO (2020-01-1T00:10:00) https-proxy[2914]: msg_id="2CFF-000A" Allow SRCNETWORK DSTNETWORK tcp SRCIP DSTIP 55280 443 msg="ProxyAllow: HTTPS content inspection exception list match" proxy_act="HTTPS-Client.Standard.1" sni="client.wns.windows.com" cn="*.wns.windows.com" exception_rule="*.windows.com" action="allow" geo_dst="GBR" src_user="USER@NAME" (RULE-NAME)

Maybe, it depends on the logstash version. If you specify an option more than once then logstash will combine them, usually in the way you would expect, but sometimes not. So do not do that. Instead of

grok {
    match => { "message" => "pattern1" }
    match => { "message" => "pattern2" }
    match => { "message" => "pattern3" }
}

use

grok {
    match => { 
        "message" => [
            "pattern1",
            "pattern2",
            "pattern3"
        ]
    }
}

Arrays are ordered, so that will test them in the order you specify. The break_on_match option controls whether it will continue testing other patterns after finding a match.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.