I want to add some fields explanation because names that are provided via logs are not self-explaining enough. I guess It's best to do this operation on filebeat level to not use resources during the pipeline process.
All of our filebeat modules also uses these sort of conditions, they are all freely available if you want to use them as references, here is one for AWS that has lots of conditions:
Hello @Marius_Iversen, thank you for your guidance, I have tried your suggestion, but I cannot make It work. Filebeat does not want to start with this configuration:
The filebeat started but the document does not contain an additional field. Is It because a document that leaves filebeat contains all metadata and field "message" and the field "observer.egress.interface.name" arises letter in when it gets to elasticsearch ingest node pipeline?
It might be a bit inconvenient for some, but personally I like to modify and shape ingest pipelines to my own usecases, as we usually have much more control of the data in the ingest pipelines themself, if you are open to that, you could always append your changes to the existing pipelines.
There is 3 ways to go about that.
You grab a list of all ingest pipelines, copy the one you want to edit (in this case the fortinet one), and you go to the kibana dev tools, paste it in, add/modify your changes, and PUT it back to overwrite the existing one.
The only downside to this, is that whenever you upgrade the beats version, you need to reapply your changes.
You force the index (like the filebeat index), to run a ingest pipeline after everything else has finished, this includes the pipelines from the module, this can be done like this: Index modules | Elasticsearch Guide [7.12] | Elastic
Under: index.final_pipeline
You disable the module in filebeat yourself, and rather just define the input in filebeat.yml, and hardcode the pipeline in your input settings, to the name of your own custom ingest pipeline. At the end of your own custom ingest pipeline, you will send the data to the fortinet ingest pipeline, with what we call a pipeline processor (a processor used in the ingest pipeline).
Each option has their own quirks, and as always in tech there is a million different ways to get the results you want, and I hope I can at least offer a few different ways.
Unless they all seem pretty daring or too much work, you would need to continue to look at the different options you have on beats itself, like using different processors like script or dissect to be able to retrieve the information you need, or multiple contains for example.
It also might be useful to ask why you want to do this, is there a reason on why you would like to do this on ingest time?
Thank you very much for your insides on this topic. I will go right now with the 2-and option because I want to avoid the need for changes during/after updates as much as possible.
I want to put more possible stress on the separate server where the filebeat runs than the elasticserach node server(right now I have only one node). I imagine in case of a burst of events it's better to put stress on ingest side of the elk stack. I would rather have trouble with one segment of the monitoring system ingest node than the whole system's core point. I am not sure if this is correct or necessary but It's my gut feeling.
I have set up the final pipeline value for the index. I tested it and it works. But not I am having trouble with making a proper script for this case. Could you give me a hint on how to make It works?
def oein = ctx?.observer?.egress?.interface?.name;
if (oein != null) {
if (oein.contains("v1823")){ # I was testing both this and (oein == "v1823")
ctx._source.test.field = "APPprod"
}
}
You should be able to do just ctx.test.field = "APPprod"
However if test does not exist, you need to create it first.
if (ctx.test == null) {
ctx.test = new HashMap();
}
The error happens when you try to access a field that does not exist, it might be that oein is not null but maybe an empty string (you can try with using !oein.isEmpty as a second null condition), or it would be that you try to access ctx._source
If you look at the parser we use for cisco, it has lots of examples on how you can do matching of large lists, which would be useful for your usecase looking for many interface names.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.