Custom log output to Kafka using Elastic Agent

We have Elasticstack 8.15 in use so that client Filebeat output goes to the Kafka and from there to the Elasticsearch. Filebeat sends different logs to separate Kafka topics. We are using following configuration in "filebeat.yml" (I'll only write lines that are meaningful)

filebeat.inputs: 
- type: filestream
  id: fs1
  enabled: true
  paths:
    - E:/Logs/folder1/*.log
  fields:
    kafka_topic: topic1

- type: filestream
  id: fs2
  enabled: true
  paths:
    - E:/Logs/folder2/*.log
  fields:
    kafka_topic: topic2

output.kafka:
  enabled: true
  topic: '%{[fields.kafka_topic]}'

We'd like to Start using Elastic Agent with Fleet, which is already working, but we have one issue: How to configure Kafka output and dynamic topics? I know that it should be done by adding Custom Logs integration to the Agent policy and the add some code to the "processors"-field. Has someone figured out how this need to be done?

You can't, Elastic Agent does not support dynamic topics.

Well, I was wrong, it seems that they added it back.

Support for dynamic topics were removed, but it seems that they reverted this decision and added it back according to this github pull request:

According to the linked PR you would need to configure it while configuring the output.

Yes, I know that they remove that feature and now it's back.

So, if I configure output that way. How should I configure "custom log" integration?
image

I do not use this integration, but checking the documentation it seems that you would need to use the add_fields processor.

Something like this:

- add_fields:
    target: fields
    fields:
      kafka_topic: test-vm-esagentsda1

The main difference is that you would need one Custom Log Integration per input in filebeat, so in your example you have 2 inputs, so you would need to add one Custom Log integration for each one of the inputs.

PS C:\Program Files\Elastic\Agent> .\elastic-agent.exe status
┌─ fleet
│ └─ status: (STARTING)
└─ elastic-agent
├─ status: (DEGRADED) 1 or more components/units in a failed state
└─ log-b2d12f01-f359-45dd-be6c-e67c43b81cc9
├─ status: (HEALTHY) Healthy: communicating with pid '4504'
├─ log-b2d12f01-f359-45dd-be6c-e67c43b81cc9
│ └─ status: (FAILED) could not start output: failed to reload output: topic '%{[fields.kafka_topic]}' is invalid, it must match '[a-zA-Z0-9._-]' accessing 'kafka'
└─ log-b2d12f01-f359-45dd-be6c-e67c43b81cc9-logfile-logs-6f3c2805-c5dd-45dc-944a-d8e101173873
└─ status: (STARTING) Starting

Yeah, it seems to not be on 8.15.0 yet.

8.15 was released August 8th, and the revert commit was merged on August 14th.

You will probably need to wait for 8.15.1 or maybe even 8.16.

1 Like

I've installed 8.16 and Dynamic topic is available and working. I put Kafka output like this

and then in every integration Processors like this (except different topic name on each)

- add_fields:
    target: fields
    fields:
      integ_topic: test-vm-eagentsda1