Filebeat processor syntax 7.12.1

Hello,

I want to add some fields explanation because names that are provided via logs are not self-explaining enough. I guess It's best to do this operation on filebeat level to not use resources during the pipeline process.

I tired something like this

processors:
  - add_field:
    when:
      equals:
        event.module: "fortinet"
          and:
            observer.egress.interface.name: "v1823"
              fields:
                observer.egress.interface.intname: "APPprod"

I need to assigned specific name for 10 different value of "observer.egress.interface.name"
What is the proper syntax to make this work?

Expecting behavior when the document contains

{"observer.egress.interface.name":"v1823",
"event.module":"fortinet"}

elasticsearch output:

{"event.module":"fortinet"},
"observer.egress.interface.name": "v1823",
"observer.egress.interface.intname": "APPprod"}

I have tested the lines below, filebeat works but the test field is not added to the document.

processors:
  - if:
      equals.event.module: "fortinet"
    then:
      - add_field:
        fields:
        observer.egress.interface.intname: "APPprod"

Hello @Adriann :slight_smile:

You seem to be missing a letter on add_field, as it should be add_fields.

Not tested myself right now, but if you want to have something easily readable and multiple or + and conditions, ref these two:

It should look something like this:

processors:
  - add_fields:
      when:
        and:
          equals:
            event.module: fortinet
          equals:
            observer.egress.interface.name: "v1823"
      fields: observer.egress.interface.intname: "APPprod"

All of our filebeat modules also uses these sort of conditions, they are all freely available if you want to use them as references, here is one for AWS that has lots of conditions:

Hello @Marius_Iversen, thank you for your guidance, I have tried your suggestion, but I cannot make It work. Filebeat does not want to start with this configuration:

processors:
  - add_fields:
      when:
        and:
          equals:
            event.module: fortinet
          equals:
            observer.egress.interface.name: "v1823"
      fields: observer.egress.interface.intname: "APPprod"

The errors says:

filebeat[11979]: Exiting: error loading config file: yaml: line 225: mapping values are not allowed in this context

`
Tye lines 225 is:

  fields: observer.egress.interface.intname: "APPprod"

Could you try to change
fields: observer.egress.interface.intname: "APPprod"
to:

fields: 
    observer.egress.interface.intname: "APPprod"

Example of the processor: Add fields | Filebeat Reference [7.12] | Elastic

The filebeat started but the document does not contain an additional field. Is It because a document that leaves filebeat contains all metadata and field "message" and the field "observer.egress.interface.name" arises letter in when it gets to elasticsearch ingest node pipeline?

Ah sorry I should have picked up that you are using the module, and indeed most of the parsing happens on the ES side.

You could maybe use something like contains, though its a bit expensive, so it depends on your throughput needs.

Example on contains: Define processors | Filebeat Reference [7.12] | Elastic

So something like:

contains:
  message: "v1823"

I was thinking about that but It would be hard to tell if the field should be ingress or egress labeled.

It might be a bit inconvenient for some, but personally I like to modify and shape ingest pipelines to my own usecases, as we usually have much more control of the data in the ingest pipelines themself, if you are open to that, you could always append your changes to the existing pipelines.

There is 3 ways to go about that.

  1. You grab a list of all ingest pipelines, copy the one you want to edit (in this case the fortinet one), and you go to the kibana dev tools, paste it in, add/modify your changes, and PUT it back to overwrite the existing one.
    The only downside to this, is that whenever you upgrade the beats version, you need to reapply your changes.

  2. You force the index (like the filebeat index), to run a ingest pipeline after everything else has finished, this includes the pipelines from the module, this can be done like this:
    Index modules | Elasticsearch Guide [7.12] | Elastic
    Under: index.final_pipeline

  3. You disable the module in filebeat yourself, and rather just define the input in filebeat.yml, and hardcode the pipeline in your input settings, to the name of your own custom ingest pipeline. At the end of your own custom ingest pipeline, you will send the data to the fortinet ingest pipeline, with what we call a pipeline processor (a processor used in the ingest pipeline).

Each option has their own quirks, and as always in tech there is a million different ways to get the results you want, and I hope I can at least offer a few different ways.

Unless they all seem pretty daring or too much work, you would need to continue to look at the different options you have on beats itself, like using different processors like script or dissect to be able to retrieve the information you need, or multiple contains for example.

It also might be useful to ask why you want to do this, is there a reason on why you would like to do this on ingest time?

1 Like

Thank you very much for your insides on this topic. I will go right now with the 2-and option because I want to avoid the need for changes during/after updates as much as possible.

I want to put more possible stress on the separate server where the filebeat runs than the elasticserach node server(right now I have only one node). I imagine in case of a burst of events it's better to put stress on ingest side of the elk stack. I would rather have trouble with one segment of the monitoring system ingest node than the whole system's core point. I am not sure if this is correct or necessary but It's my gut feeling.

I have set up the final pipeline value for the index. I tested it and it works. But not I am having trouble with making a proper script for this case. Could you give me a hint on how to make It works?

def oein = ctx?.observer?.egress?.interface?.name;
if (oein != null) {
    if (oein.contains("v1823")){  # I was testing both this and (oein == "v1823")
        ctx._source.test.field = "APPprod"
    }
}

You should be able to do just ctx.test.field = "APPprod"

However if test does not exist, you need to create it first.

if (ctx.test == null) {
  ctx.test = new HashMap();
}

The error happens when you try to access a field that does not exist, it might be that oein is not null but maybe an empty string (you can try with using !oein.isEmpty as a second null condition), or it would be that you try to access ctx._source

1 Like

I would also recommend using the pipeline simulate API, that way you can test your ingest pipelines on the fly: Simulate pipeline API | Elasticsearch Guide [7.12] | Elastic

If you look at the parser we use for cisco, it has lots of examples on how you can do matching of large lists, which would be useful for your usecase looking for many interface names.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.