Creating kafka topic automatically while sending logs from filebeat

is it possible to create kafka topic automatically based on the fields while sending logs from filebeat? I want to send logs from hosts through filebeat to kafka and it will create the topics automatically. And logstash will read the logs from kafka topics dynamically and route the logs to different Elasticsearch clusters. How can I achieve that?

Here is my filebeat configuration:

output.kafka:
  hosts: ['qakafka1:9092']
  topic: "qafilebeat"
  topics:
    - topic: "%{[attribute.APPLICATION]}-%{[logType]}-logs"
      when:
        or:
          - equals:
              logType: "access"
          - equals:
              logType: "error"
    - topic: "%{[attribute.APPLICATION]}-error-logs"
      when:
        regexp:
          attribute.APPLICATION: ".+"
  partition.round_robin:
    reachable_only: False
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000 

Here is logstash configuration:

input{
  kafka
  {
    bootstrap_servers => ["qakafka1:9092"]
    topics => ["logstash", "qafilebeat", "prfilebeat", "app1-error-logs", "app1-access-logs"]
    auto_offset_reset => latest
    decorate_events => true
    group_id => "logstash_indexer"
  }
}

Yes, you can have beats dynamically create topics. But, for this to work you need to have kafka configured to allow dynamic topic creation.

The topic setting can also be a format-string. e.g. if logType is not always available you can do this:

output.kafka:
  topic: '%{[attribute.APPLICATION]}-%{[logType]:error}-logs'
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.