Drop filter to avoid documents to be sent to ES not working

Hi everybody.

I've have a ES + Kib + Logstash + Filebeat environment ready for testing purposes, and after installing Filebeat so it sends the data to Logstash, I've realiced that I'm recieving more documents than neccesary, as some of them are completely irrelevant to me. I know this is a very typical subject/problem that have been discussed many times, but although I'm trying to apply all the solutions I've found, none of them is working for me:

I'm trying to get rid of all events that are created as a consequence of the 'avahi-daemon'.

I can find those documents in Kibana´s Analitics/Discover section with the following format:

@timestamp

Jun 7, 2022 @ 17:29:46.000

@version

1

agent.ephemeral_id

50154ac2-98bb-4362-84ff-8e5d673c836d

agent.hostname

ubuntuelk

agent.id

4f5d3321-54c8-4858-851e-a30d531eedbd

agent.name

ubuntuelk

agent.type

filebeat

agent.version

8.2.1

ecs.version

1.12.0

event.dataset

system.syslog

event.ingested

Jun 7, 2022 @ 17:29:50.316

event.kind

event

event.module

system

event.original

Jun 7 17:29:46 ubuntuelk avahi-daemon[769]: Registering new address record for fe80::94ca:bd6f:70f5:3d41 on ens33.*.

event.timezone

+02:00

fileset.name

syslog

host.architecture

x86_64

host.containerized

false

host.hostname

ubuntuelk

host.id

4293b3061fe540d9b3cdd77a03af46c3

host.ip

192.168.0.111, fe80::6b28:900b:b61d:b756, fe80::5d73:dcbe:a241:d705,

If I run a query for them in the 'Dev Tools' prompt, this is how they look like:

{
        "_index" : ".ds-filebeat-8.2.1-2022.06.06-000001",
        "_id" : "rmjNPoEBevuxJxIgmcAI",
        "_score" : 1.0,
        "_source" : {
          "agent" : {
            "name" : "ubuntuelk",
            "id" : "4f5d3321-54c8-4858-851e-a30d531eedbd",
            "type" : "filebeat",
            "ephemeral_id" : "50154ac2-98bb-4362-84ff-8e5d673c836d",
            "version" : "8.2.1"
          },
          "process" : {
            "name" : "avahi-daemon",
            "pid" : 769
          },
          "log" : {
            "file" : {
              "path" : "/var/log/syslog"
            },
            "offset" : 2222249
          },
          "fileset" : {
            "name" : "syslog"
          },
          "message" : "Withdrawing address record for fe80::6b28:900b:b61d:b756 on ens33.",
          "tags" : [
            "beats_input_codec_plain_applied"
          ],
          "input" : {
            "type" : "log"
          },
          "@timestamp" : "2022-06-07T17:34:26.000+02:00",
          "system" : {
            "syslog" : { }
          },
          "ecs" : {
            "version" : "1.12.0"
          },
          "related" : {
            "hosts" : [
              "ubuntuelk"
            ]
          },
          "service" : {
            "type" : "system"
          },
          "host" : {
            "hostname" : "ubuntuelk",
            "os" : {
              "kernel" : "5.13.0-44-generic",
              "codename" : "focal",
              "name" : "Ubuntu",
              "type" : "linux",
              "family" : "debian",
              "version" : "20.04.3 LTS (Focal Fossa)",
              "platform" : "ubuntu"
            },
            "ip" : [
              "192.168.0.111",
              "fe80::6b28:900b:b61d:b756",
              "fe80::5d73:dcbe:a241:d705",
              "fe80::94ca:bd6f:70f5:3d41"
            ],
            "containerized" : false,
            "name" : "ubuntuelk",
            "id" : "4293b3061fe540d9b3cdd77a03af46c3",
            "mac" : [
              "00:0c:29:f5:4e:bf"
            ],
            "architecture" : "x86_64"
          },
          "@version" : "1",
          "event" : {
            "ingested" : "2022-06-07T15:34:30.406056321Z",
            "original" : "Jun  7 17:34:26 ubuntuelk avahi-daemon[769]: Withdrawing address record for fe80::6b28:900b:b61d:b756 on ens33.",
            "timezone" : "+02:00",
            "kind" : "event",
            "module" : "system",
            "dataset" : "system.syslog"
          }
        }
      }

As you can see this document, as many other have the field process.name with this value: "avahi-daemon"

I'm trying to set a filter to drop that objects in my Logstash's pipeline configuration file.

This is how it looks:

input {
    beats {
        port => "5044"
    }
}

filter {
  if "avahi-daemon" in [process.name] {
    drop { }
  }
}

output {
  if [@metadata][pipeline] {
    elasticsearch {
      hosts => ["https://192.168.0.111:9200","https://192.168.0.112:9200","https://192.168.0.113:9200"]
      cacert => '/certs/elastic/http_ca.crt'
      pipeline => "%{[@metadata][pipeline]}"
      user => "${LS_USER}"
      password => "${LS_PWD}"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      action => "create"
    }
  } else {
    elasticsearch {
      hosts => ["https://192.168.0.111:9200","https://192.168.0.112:9200","https://192.168.0.113:9200"]
      cacert => '/certs/elastic/http_ca.crt'
      user => "${LS_USER}"
      password => "${LS_PWD}"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      action => "create"
    }
  }
}

But as you can guess this is not working.

Could please someone tell me what am I doing wrong?

Thank you very much in advance.

Carlos T.

This needs to be:

if "avahi-daemon" in [process][name]

Using process.name in logstash means that you want to filter on a field named process.name where the dot is part of the name of the field.

In resume:

  • [process][name] in logstash is equal to { "process": { "name": "some-value" } }
  • [process.name] in logstash is equal to { "process.name": "some-value" }

Hi Leandro. Thank you very much for you answer and explanation. For any reason I don't know, the [process][name] approach didn't work. But it did work the [event][original] one

So I used:

filter {
  if "avahi-daemon" in [event][original] {
    drop { }
  }
}

My guess is that I can only use fields that are shown in the Analytics/Discover section. Could you please confirm if this is the normal behaviour?

Thank you very much again and regards.

Carlos T.

In logstash filters you can only use fields that exists in the document while it is in the logstash pipeline.

Looking closer to the document you shared and your configuration, it seems that you are using an ingest pipeline in Elasticsearch, so it is probably this pipeline that is parsing your message and creating the fields like process.name, so the field [process][name] does not exists in the document while it is passing through the logstash pipeline, it will only be create after the ingest pipeline, running in Elasticsearch, parses your message.

In this case you need to use [event][original] or sometimes the [message] field.

1 Like

Now I think I understand the way it works, and know that I do it it would be interesting to be capable to get a list of the available fields of the documents while it is passing through the logstash pipeline.
Is there any way to get a list of those fields?

Thank you Leandro

You can just run a few lines through the console output.

Thats should show what is available

Hi again. Thanks to both of you for the useful info and advises.
@leandrojmp I've tried [event][original] and it worked.

@stephenb I've been getting the documents fields and values from the standard output of filebeat console and I could find the document of my interest. For my surprise I've only found the substring 'avahi-daemon' in the 'message' field and nowhere-else. So I've also tried to use [message] in the logstash pipeline filter and it works too.

So both approaches seem to work fine. Said so, which one would you recommend the most?.

Choosing the field from the Kibana/discovery output, or choosing the field from the Filebeat standard output? Or it depends on the kind of document and each situation is different?

Sorry for asking so much questions. I've just want to know what is the best practice about it.

And thanks again to both of you. :slight_smile:

It depends where the processing is happening...

You could imagine a pipeline where some processing is happening in filebeat, some in logstash and some at Elasticsearch in an ingest pipeline. Kibana Discover will show the end completely processed document ... but you will not know what it looked like at each point. So we need to understand the format / schema of the document at the point where you want to process it.

And depending on the logic at each step the process the document could look different.

Me that is why I try to do all the processing in once place ... Logstash or Ingest Pipeline, much easier to debug, update, understand when all the code is in 1 place.

The only thing I normally do in filebeat is drop unnecessary messages which is what you are trying to do... but filebeat will need access to the field that you need to use to conditionally drop... so sometime I just opt to send them all to Logstash and use that to drop the messages.

Many ways to accomplish the processing... Optimal depends on your use case and requirements / goals.

Hope that makes sense.

I think It is perfectly explained. I've been getting all the fields name by using the Filebeat console, and I've used these field names in the pipeline.conf file of logstash to filter and to drop those documents I didn't want to get included, and that's why I couldn't find the [original][message] field. So, what I've just done is using the logstash console standar output to see what field I could find there. And between many other info I've found:

 "message" => "Jun  8 20:12:52 ubuntuelk02 avahi-daemon[773]: Registering new address record for 127.0.0.1 on lo.IPv4.",
         "event" => {
        "timezone" => "+02:00",
          "module" => "system",
        "original" => "Jun  8 20:12:52 ubuntuelk02 avahi-daemon[773]: Registering new address record for 127.0.0.1 on lo.IPv4.",
         "dataset" => "system.syslog"

Which means that Logstash can find the sub-string 'avahi-daemon in both: '[original][message]' and [message] fields. And that's why both approched I tried before worked well. Now I see why it is working and I think I know well where I have to look in future cases. And I agree with you, I think it makes sense to manage all this filtering at the same component of the pipeline, and Logstash makes sense for me, at least in my modest home environment. But I intend to get it done in no much time in my job.

Well Stephen and Leandro, many, many thanks again to both of you. This not only solves my problem but also makes me undertand it.

Regards.

Carlos T.

1 Like