Guidance on filtering in the logstash.conf file


Hi, I've spent a couple of days looking at filtering and probably just cannot get the basic nuances for what I want to do to make it work.

Basic set up. I have an auditbeat running on a linux server sending over to logstash on another linux server that has ELK set up on it. We want to do the filtering on the ELK server rather than at the auditbeat end.

Set-up in audit beat is using the audit module with a number of auditd rules. These are happily being received by ELK and display happily in Kibana. Problem is that 90+% of what is being received is noise and I want to use logstash to filter out the noise before it gets to ES.

I've tried a number of filters that just seemed to be ignored and still send everything through. Examples of good filters on the internet are few and far between. What I need is an example of a working one that I can then build on and expand. It is just getting that first one that is eluding me.

In the message is and I want to filter out any that have nagios as the ID.

Any help gratefully received.

(I've not put any examples of what I've tried as I have no idea if any are in anyway close to being right!)

(Makara) #2

Hi @nlh

You can write your own custom grok to extract the pattern you are interested like the NAGIOS
ID. This will ensure removal of noise before it get passed to ES.

filter {
           grok {
                       match => { "message" => "%{NID:YourPattern}" }


Thanks @Makra

Tried similar. for example:

filter {
  grok {
    match => { "message" => "%{}"
      drop { }

But get the following error on starting logstash:

[ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Expected one of #, => at line 13, column 12 (byte 321) after filter {\n grok {\n match => { "message" => "%{}"\n drop "}

line 13 is the drop.

With the following:

filter {
  drop {
    grok {
      match => { "message" => "%{}" }

Get the following error:

[ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Expected one of #, => at line 12, column 10 (byte 179) after filter {\n drop {\n grok "}

Line 12 is the grok.

So my guess is (and apologies) some basic understanding of how these work. Once I can get a working version, then I'm sure (OK, sincerely hope!) the rest will fall into place.

Just trying to exclude messages received by logstash so they are not sent to elastic search.

Even when I have created ones that do not error, the messages are still getting through to elastic search.


Wondering if I have just twigged on something in relation to grok. I have a predefined template in Elasticsearch and it is probably this that I am seeing that enables me to see

t @timestamp                                    February 26th 2018, 13:24:04.728
t @version                                      1
t _id                                           AWHSSN4xQjO8IU9hMV6_
t _index                                        auditbeat-6.1.2-2018.02.26
# _score                                        1
t _type                                         logs
t                 nagios
t                 nagios
t                 nagios
t                nagios
t                nagios
t                  nagios
t                 nagios
t                 nagios
t                  nagios
t                    nagios
t                  nagios
t audit.kernel.category                         audit-rule
t                          3
t                          7ffc116c68c0
t                          4000
t                          8
t                        x86_64
t                        sshd
t                         /usr/sbin/sshd
t                        104
t                       0
t                         23310
t                        23308
t                     read
t                         (none)
t                              /usr/sbin/sshd
t audit.kernel.key                              b64_call
t audit.kernel.record_type                      syscall
t audit.kernel.result                           success
# audit.kernel.sequence                         25,226,674
t audit.kernel.session                          16846
t beat.hostname                       
t                                     ptc-desk
t beat.version                                  6.1.2
t host                                
t metricset.module                              audit
t                                kernel
t tags                                          beats_input_raw_event

I'm guessing that in logstash, I have to use grok to give me this mapping in the message, and then I can use that to test on the contents in the fields?

e.g. something like

 grok {
		match => {"message" => "%{TIMESTAMP_ISO8601:timestamp} %{TEXT:version} %{TEXT:id} %{TEXT:index} %{TEXT:score} %{TEXT:type} %{} %{} and so on for each of the items in the message"}

Once I have done that, I should be able to directly interrogate items in the message?

I'm guessing that I have TEXT as in the syntax incorrect, but just using it as an example.


Perhaps something like this?

TIMESTAMP_ISO8601:timestamp                        February 26th 2018, 13:24:04.728
WORD:version                                       1
WORD:_id                                           AWHSSN4xQjO8IU9hMV6_
WORD:_index                                        auditbeat-6.1.2-2018.02.26
WORD:_score                                        1
WORD:_type                                         logs                 nagios                 nagios                 nagios                nagios                nagios                  nagios                 nagios                 nagios                  nagios                    nagios                  nagios
WORD:audit.kernel.category                         audit-rule                          3                          7ffc116c68c0                          4000                          8                        x86_64                        sshd                         /usr/sbin/sshd                        104                       0                       23310                      23308                     read                         (none)                              /usr/sbin/sshd
WORD:audit.kernel.key                              b64_call
WORD:audit.kernel.record_type                      syscall
WORD:audit.kernel.result                           success
WORD:audit.kernel.sequence                         25,226,674
WORD:audit.kernel.session                          16846
WORD:beat.hostname                                                            ptc-desk
WORD:beat.version                                  6.1.2
WORD:metricset.module                              audit                                kernel
WORD:tags                                          beats_input_raw_event


So something like (but is not working):

filter {
  grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{WORD:version} %{WORD:_id} %{WORD:_index} %{WORD:_score} %{WORD:_type} %{} %{} %{} %{} %{} %{} %{} %{} %{} %{} %{} %{WORD:audit.kernel.category} %{} %{} %{} %{} %{} %{} %{} %{} %{} %{} %{} %{} %{} %{} %{WORD:audit.kernel.key} %{WORD:audit.kernel.record_type} %{WORD:audit.kernel.result} %{WORD:audit.kernel.sequence} %{WORD:audit.kernel.session} %{WORD:beat.hostname} %{} %{WORD:beat.version} %{WORD:host} %{WORD:metricset.module} %{} %{WORD:tags} " }
  if [] == "nagios" {
    drop { }


Sheesh it is easy once you know

filter {
  if [user][name_map][auid] == "nagios" {
    drop { }

(system) #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.