Do we need to add ECS Logger to our Filebeat just to get log.level as a field in the JSON?

I've been rereading the Filebeat documentation to try to deepen my understanding of how it works.

We have been configuring Filebeat manually with inputs like this:

- type: filestream
  id: my-api
  fields:
    app_id: my-api
  paths:
    - /opt/myappdev/logs/my-api-logs/*.log

We can see logs from the above input being collected and indexed into our Elasticsearch cluster. We can also see that each line of the log file is collected into a JSON document and stored in a message field; with fields like @timestamp and agent.id being added by Filebeat.

The documentation at Introduction | ECS Logging Reference | Elastic mentions a log.level field. That is one field we do not see in the JSON documents indexed by Filebeat into Elasticsearch.

At preset, we can see the log level embedded in the message field in the JSON, in the below example the log level is INFO

"message": "2023-04-11 22:10:26,107 INFO com.intelsat.ams.clients.AmsKafkaConsumerClient [task-1] Total records processed:80752"

If we want log.level to show up in the JSON, do we need to add a Java ECS Logger? The application that generates the log file is coded in Java. Or is there a setting in filebeat.yml that I am missing?

No, there is no need, you just need to parse your message before indexing it in Elasticsearch.

If you are sending your logs with filebeat directly to Elasticsearch you can do that using an ingest pipeline with a dissect processor for example.

1 Like

Thank you. Set up an ingest pipeline with a dissect processor and it is extracting the log level just like we wanted.

As it turns out, our Filebeat instance is set up with 3 filestream inputs. Incoming data from the 3 inputs are being treated the same by the pipeline and processor.

I was just informed that the log data coming into the 3rd input will be modified so that the pattern used by the dissect processor will no longer match documents that came through the 3rd intput.

Would a good approach be to enable Ignore Failure for this dissect processor, and put another dissect processor after it to handle documents coming into the 3rd input?

You could OR you could put a tag or field in each input and then apply logic with a condition in the ingest pipeline.

And if condition is more efficient that letting the dissect processor fail.

1 Like

Just to confirm, conditional pipeline is not supported in Elastic 7.17, correct? I do not see a field for entering an if condition here

Also, I set ignore-failure to true in my Dissect processor and added a Fail processor to my Failure Processors with message = error.message.

When I get a compile error in my pipeline I do not see output from the Fail processor with the error message that I expect to see - like invalid syntax or whatever.

What am I doing wrong in this case?

My guess is the condition I put in my Dissect processor is incorrect, but all I see is "compile error" which tells me nothing about what is wrong with the condition. BTW, the condition is

fields.app_id = my-api-app

try

ctx?.fields?.app_id == 'my-api-app'

Look at this for how conditions work

so when you see this in the docs
"if": "ctx?.network?.name == 'Guest'"

what goes in the UI is the
ctx?.network?.name == 'Guest'

1 Like

Thanks, we are closer to what we want now! One pipeline ingesting documents from our Filebeat, with 2 processors, one for each particular fields.app_id value.

I was able to test from the index using the UI that lets me input a document ID and index.

Is there a way to grab documents that were indexed into Elasticsearch the day before I finished working on our ingest pipeline? Or I do I need to wait for new documents to be indexed before I can use the transformed JSON for visualizations?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.