Filters Not Working At All

I have the following filter set up to drop certain logs and add tags to syslog events from my Sophos XG550 firewalls:

input {
  tcp {
    port => 6000
    type => syslog
  }
  udp {
    port => 6000
    type => syslog
  }
}

filter {
# Drop DHCP packets
  if [log_component] == "DHCP Server" {
    drop { }
  }
}

filter {
  if [device_name] == "XG550" {
    mutate {
	  add_tag => [ "Sophos" ]
	}
  }
}

filter {
  if "Sophos" in [tags] {
  ...
}
...

The filters for dropping the DHCP server traffic and adding the Sophos tag are not working. Since I use the Sophos tag to filter the output later in the config, nothing gets output. The only way I can get it to work is to add the tag as part of the input. Since I'll probably end up re-using the input block for other syslog sources I don't want to do that.

Am I missing something? Here's the raw data of a message received:

{
  "_index": "sophos_xg-2020.03.04",
  "_type": "doc",
  "_id": "JGMLp3ABhBEk_dXgm73n",
  "_version": 1,
  "_score": null,
  "_source": {
    "raw_data": "192.168.10.161\tWed 04 Mar 19:18:55 2020\tWed 04 Mar 23:18:55 2020\tb0:35:9f:xx:xx:xx\tDESKTOP-xxxxx",
    "<30>device": "SFW",
    "date": "2020-03-04",
    "log_subtype": "System",
    "device_name": "XG550",
    "log_component": "DHCP Server",
    "tags": [
      "_geoip_lookup_failure"
    ],
    "timezone": "GMT",
    "log_id": "063411660020",
    "@version": "1",
    "priority": "Information",
    "ipaddress": "192.168.10.161",
    "client_physical_address": "b0:35:9f:xx:xx:xx",
    "message": "Lease IP 192.168.10.161 renewed for MAC b0:35:9f:xx:xx:xx",
    "type": "syslog",
    "time": "19:35:20",
    "device_id": "xxxxxxxxxxx",
    "log_type": "Event",
    "host": "172.20.200.50",
    "status": "Renew",
    "@timestamp": "2020-03-04T19:35:20.476Z"
  },
  "fields": {
    "date": [
      "2020-03-04T00:00:00.000Z"
    ],
    "@timestamp": [
      "2020-03-04T19:35:20.476Z"
    ]
  },
  "highlight": {
    "log_subtype": [
      "@kibana-highlighted-field@System@/kibana-highlighted-field@"
    ]
  },
  "sort": [
    1583350520476
  ]
}

At what point in the filter chain is the log_component field created? Are you by any chance trying to use it before it exists?

It's the first element filtered on after the input. log_component is part of the fields coming from the device, so I'm not sure what you mean by created.

I did not see any codec for the inputs listed so was not sure where it would get extracted. It may help if you can show your full config.

Here you go:

input {
  tcp {
    port => 6000
    type => syslog
  }
  udp {
    port => 6000
    type => syslog
  }
}

filter {
# Drop DHCP packets
  if [log_component] == "DHCP Server" {
    drop { }
  }
}

filter {
  if [device_name] == "XG550" {
    mutate {
	  add_tag => [ "Sophos" ]
	}
  }
}

filter {
  if "Sophos" in [tags] {
  
    mutate {
      gsub => [
#     replace all "= " with double quotes to truly indicate no value
      "message", "= ", '="" '
      ]
    }
    kv {
      id => "sophos_kv"
      source => "message"
      trim_key => " "
      trim_value => " "
      value_split => "="
      field_split => " "
    }
	
#	WAF uses sourceip
    if [sourceip] {
      mutate {
	    add_field => { "src_ip" => "%{sourceip}" }
		remove_field => ["[sourceip]"]
	  }
	}
	
#	WAF uses localip for destination
    if [localip] {
      mutate {
	    add_field => { "dst_ip" => "%{localip}" }
		remove_field => ["[localip]"]
	  }
	}

    #now check if source IP is a private IP, if so, tag it   
	if [src_ip] {
      cidr {
        add_tag => [ "src_internalIP" ]
        address => [ "%{src_ip}" ]
        network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
      }
	}
    
    # don't run geoip if it's internalIP, otherwise find the GEOIP location
    if "src_internalIP" not in [tags] {
      geoip {
        add_tag => [ "src_geoip" ]
        source => "src_ip"
      }
    } 
	else {
      #check DST IP now.  If it is a private IP, tag it 
	  if [dst_ip] {
        cidr {
          add_tag => [ "dst_internalIP" ]
          address => [ "%{dst_ip}" ]
          network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
        }
      }
    
      # don't run geoip if it's internalIP, otherwise find the GEOIP location
      if "dst_internalIP" not in [tags] {
        geoip {
          add_tag => [ "dst_geoip" ]
          source => "dst_ip"
        }
      }
    }
  }
}

output {
  if "Sophos" in [tags] {
    elasticsearch { 
      hosts => ["localhost:9200"] 
      index => "sophos_xg-%{+YYYY.MM.dd}"
    }
  }
}

Can you try removing all filters and just output the events received by the inputs to stdout with a rubydebug codec so we can see the exact format at the point the filter runs? Do you have any other config files in the directory that could be interfering? The sample event you showed does not seem to have Sophos present in the tags field, so should not be written to that index by that config file.

Sorry for the delay ... work has been a little crazy with the COVID-19 stuff...

Here's an example event written to a file with rubydebug with no filters or tags:

{
          "host" => "172.20.200.50",
       "message" => "<30>device=\"SFW\" date=2020-03-12 time=13:17:18 timezone=\"GMT\" device_name=\"XG550\" device_id=C5000xxxxxxxxxx log_id=010202601001 log_type=\"Firewall\" log_component=\"Invalid Traffic\" log_subtype=\"Denied\" status=\"Deny\" priority=Information duration=0 fw_rule_id=0 policy_type=0 user_name=\"\" user_gp=\"\" iap=0 ips_policy_id=0 appfilter_policy_id=0 application=\"\" application_risk=0 application_technology=\"\" application_category=\"\" in_interface=\"\" out_interface=\"\" src_mac= src_ip=1.2.3.4 src_country_code= dst_ip=5.6.7.8 dst_country_code= protocol=\"TCP\" src_port=24720 dst_port=80 sent_pkts=0  recv_pkts=0 sent_bytes=0 recv_bytes=0 tran_src_ip= tran_src_port=0 tran_dst_ip= tran_dst_port=0 srczonetype=\"\" srczone=\"\" dstzonetype=\"\" dstzone=\"\" dir_disp=\"\" connid=\"\" vconnid=\"\" hb_health=\"No Heartbeat\" message=\"Could not associate packet to any connection.\" appresolvedby=\"Signature\" app_is_cloud=0",
          "type" => "syslog",
    "@timestamp" => 2020-03-12T13:17:18.784Z,
      "@version" => "1"
}

The sample event you showed does not seem to have Sophos present in the tags field, so should not be written to that index by that config file.

That's kind of the point of this post. I want to tag the messages with Sophos based on a field so they get processed further. I've been working around this by adding the tag as part of the input header, but if I want to add additional devices using syslog input they would get tagged with Sophos as well.

Well, as you can see from your last post, those fields you're making the checks on in your filters are not present in your event. You first need to parse that message fields and extracts those fields and then you can make something based on conditional logic.

You are telling logstash if [foo_field] == "bar" { do this } but logstash is only seeing ["host", "message", "type"] fields. There's no foo_field field from its POV.

To be honest I don't know how you can have all those fields parsed, as shown in one of your previous posts.

Finally, you do not need to make a filter section for each field. You can make only one filter section and put all your filters in there.

1 Like

I'm not really sure either. This was something I pulled from someone on Github. What's weird is once I add the Sophos tag in the input section, it all works as expected.

Reading through the syslog input plugin on the Elastic site, it looks like the plugin is maybe parsing the fields in the message field automatically?

EDIT: It looks like this might be the key:

kv {
      id => "sophos_kv"
      source => "message"
      trim_key => " "
      trim_value => " "
      value_split => "="
      field_split => " "
    }

This is parsing the foo=bar fields in the message field and creating the fields that Logstash can then work with.

Well, this is different from what you posted. You are saying that to make it work you have to add some tags, which might be perfectly possible because logstash can be able to recognize some standard formats from the tags.

But the looking at the pipeline you previously posted, with such an input

input {
  tcp {
    port => 6000
    type => syslog
  }
  
  udp {
    port => 6000
    type => syslog
  }
}

and no parsing filters (json, kv, grok), it couldn't work. In fact, it returns this:

{
          "host" => "172.20.200.50",
       "message" => "<30>device=\"SFW\" date=2020-03-12 time=13:17:18 timezone=\"GMT\" device_name=\"XG550\" device_id=C5000xxxxxxxxxx log_id=010202601001 log_type=\"Firewall\" log_component=\"Invalid Traffic\" log_subtype=\"Denied\" status=\"Deny\" priority=Information duration=0 fw_rule_id=0 policy_type=0 user_name=\"\" user_gp=\"\" iap=0 ips_policy_id=0 appfilter_policy_id=0 application=\"\" application_risk=0 application_technology=\"\" application_category=\"\" in_interface=\"\" out_interface=\"\" src_mac= src_ip=1.2.3.4 src_country_code= dst_ip=5.6.7.8 dst_country_code= protocol=\"TCP\" src_port=24720 dst_port=80 sent_pkts=0  recv_pkts=0 sent_bytes=0 recv_bytes=0 tran_src_ip= tran_src_port=0 tran_dst_ip= tran_dst_port=0 srczonetype=\"\" srczone=\"\" dstzonetype=\"\" dstzone=\"\" dir_disp=\"\" connid=\"\" vconnid=\"\" hb_health=\"No Heartbeat\" message=\"Could not associate packet to any connection.\" appresolvedby=\"Signature\" app_is_cloud=0",
          "type" => "syslog",
    "@timestamp" => 2020-03-12T13:17:18.784Z,
      "@version" => "1"
}

In fact, you do not have any tags field here. It means you can never enter that if "Sophos" in [tags] condition.

BUT, if you add the Sophos in the tags in the input section (differently from what you previously wrote), you do of course enter that condition and the key-value parsing filter is applied. That's why you do get all those parsed fields in your elasticsearch output. It means you have to put this

# Drop DHCP packets
if [log_component] == "DHCP Server" {
  drop { }
}

AFTER the kv filter, otherwise logstash won't find the log_component field since it won't be parsed yet.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.