Creating drop filters based on a string search in a message field


(David Vosbury) #1

I'm new to the Elastic stack and Logstash. I have a new Logstash instance that is accepting logs from beats sending to Elasticsearch. I'm trying to create a filter that will drop some logs that we aren't that interested in based upon a string in the log message. I've spent the better part of a day googling and looking at the docs and trying different things like an if statement based on regex or a match => with a groc filter. Nothing I've tried has worked so far. The logs I'm trying to filter out still show up in Kibana when I search on the string so the filter seems effectively ignored in my config file. I would think that if the filter was incorrect that Logstash would refuse to restart/reload but no matter what I put in the filter Logstash happily restarts but never drops the intended log messages.

Here is the contents of my logstash.conf file:

input {
beats {
port => 5044
}
}

filter {

if [message] =~ ".*ASA-6-302013*."] { drop{} }

}

output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}


(Magnus Bäck) #2
if [message] =~ ".*ASA-6-302013*."] { drop{} }
  1. There's an extra square bracket. I doubt Logstash starts up properly with that in place.
  2. Regular expressions should be surrounded by slashes, not quotes.
  3. Do you really intend for it to end with *. and not .*?
  4. A leading and trailing .* serves no purpose.
  5. If you're only doing a substring check you don't even need a regexp.

So, assuming that you actually meant .* (question 3) you can do this:

if "ASA-6-302013" in [message] { drop { } }

(David Vosbury) #3

Thanks for the reply, Magnus. I'll try your suggestion. I had originally gone down this path, since it seemed like the simplest way to achieve my objective, but it didn't work. I may have had some kind syntax error though. I'll try as in your example and report back. Eventually I ended up with the mess that I posted. I was looking for some regex examples, but didn't find any that worked after I couldn't get your method to work.


(David Vosbury) #4

I've changed my config to this:

input {
beats {
port => 5044
}
}

filter {

if "ASA-6-302013" in [message] { drop{ } }

}

output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

Unfortunately, when searching my filebeat index in kibana I still see lines with this string in the message. Is there something wrong with the rest of my config file?


(Magnus Bäck) #5

Please show a message that you believe slipped through. Use a stdout { codec => rubydebug } output or copy/paste from the JSON tab in Kibana. Do not post any screenshots.


(David Vosbury) #6

Magnus,

Here is an example in json of a message that I think my filter should have dropped.

{
"_index": "filebeat-2016.10.27",
"_type": "log",
"_id": "AVgGKnWxk4LkXWWZLcu-",
"_score": null,
"_source": {
"message": "Oct 27 12:43:28 asa10 : %ASA-6-302013: Built inbound TCP connection 1033541997 for outside:10.150.0.0./46742 (10.150.0.0/46742) to inside.customer:172.26.0.0/8080 (161.215.0.0/8080)",
"@version": "1",
"@timestamp": "2016-10-27T12:43:29.155Z",
"input_type": "log",
"count": 1,
"beat": {
"hostname": "syslog",
"name": "syslog"
},
"source": "/var/log/syslog",
"offset": 181011926,
"type": "log",
"fields": null,
"host": "syslog",
"tags": [
"beats_input_codec_plain_applied"
]
},
"fields": {
"@timestamp": [
1477572209155
]
},
"sort": [
1477572209155
]
}

Am I just not using the right field to filter on?

David


(Magnus Bäck) #7

There's nothing wrong with the filter. Perhaps you're not running with the configuration you expect?

$ cat test.config 
input { stdin { } }
output { stdout { codec => rubydebug } }
filter {
  if "ASA-6-302013" in [message] { drop{ } }
}
$ ( echo 'first message' ; echo 'Oct 27 12:43:28 asa10 : %ASA-6-302013: Built inbound TCP connection 1033541997 for outside:10.150.0.0./46742 (10.150.0.0/46742) to inside.customer:172.26.0.0/8080 (161.215.0.0/8080)' ; echo 'third message' ) | /opt/logstash/bin/logstash -f test.config
Settings: Default pipeline workers: 8
Pipeline main started
{
       "message" => "first message",
      "@version" => "1",
    "@timestamp" => "2016-10-27T12:59:19.746Z",
          "host" => "lnxolofon"
}
{
       "message" => "third message",
      "@version" => "1",
    "@timestamp" => "2016-10-27T12:59:19.780Z",
          "host" => "lnxolofon"
}
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}

Skip indexing based on field value
(David Vosbury) #8

That was exactly the problem. I was not saving the configuration to the proper place. Argh! Thanks so much for your help.


(David Vosbury) #9

Actually after monitoring this for a little while I realized that filter is dropping everything. Why would that be?


(Vinod Hy) #10

Extending this query, can we add a condition to send logs where pattern match a particular string.
For Eg:
filter{
grok
{
match => {"message" =>"%{IP:client} %{NUMBER:duration} %{GREEDYDATA:messageFromClient}"}
}
kv
{
source => "keyval"
field_split => ","
remove_field => [ "keyval" ]
}
}

In the above grok filter, i want to send logs only when client matches some ip. Rest other ips should be dropped.


(Magnus Bäck) #11

@vinod_hy, please start new threads for new questions. But yes, you can put conditions around outputs too. See https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html.


(Vinod Hy) #12

I am sorry. Will make sure to create new topic. and thanks for the link. Will go through it