is it possible to create kafka topic automatically based on the fields while sending logs from filebeat? I want to send logs from hosts through filebeat to kafka and it will create the topics automatically. And logstash will read the logs from kafka topics dynamically and route the logs to different Elasticsearch clusters. How can I achieve that?
Here is my filebeat configuration:
output.kafka:
hosts: ['qakafka1:9092']
topic: "qafilebeat"
topics:
- topic: "%{[attribute.APPLICATION]}-%{[logType]}-logs"
when:
or:
- equals:
logType: "access"
- equals:
logType: "error"
- topic: "%{[attribute.APPLICATION]}-error-logs"
when:
regexp:
attribute.APPLICATION: ".+"
partition.round_robin:
reachable_only: False
required_acks: 1
compression: gzip
max_message_bytes: 1000000
Here is logstash configuration:
input{
kafka
{
bootstrap_servers => ["qakafka1:9092"]
topics => ["logstash", "qafilebeat", "prfilebeat", "app1-error-logs", "app1-access-logs"]
auto_offset_reset => latest
decorate_events => true
group_id => "logstash_indexer"
}
}