I installed Filebeat 5.0 on my app server and have 3 Filebeat prospectors, each of the prospector are pointing to different log paths and output to one kafka topic called myapp_applog
and everything works fine.
My Filebeat output configuration to one topic - Working
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
# message topic selection + partitioning
topic: 'myapp_applog'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
What I want to do is send each of the log files to seperate topics based on a condition see documentation section on topics. I have tried to do it but no data is been sent to any of the topics. Does anyone know why my condition does not match or it is correct. I can seem to find an example on how to correctly use the "topics topic condition".
Here is my kafka output to muliple topics configuration.
Not Working
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
# message topic selection + partitioning
topics:
- topic: 'myapp_applog'
when:
equals:
document_type: applog_myappapi
- topic: 'myapp_applog_stats'
when:
equals:
document_type: applog_myappapi_stats
- topic: 'myapp_elblog'
when:
equals:
document_type: elblog_myappapi
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
Here is full filebeat.yml configuration file.
################### Filebeat Configuration Example #########################
############################# Filebeat ######################################
filebeat.prospectors:
# App logs - prospector
- input_type: log
paths:
- /myapp/logs/myapp.log
exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
fields:
api: myappapi
environment: STG
ignore_older: 24h
document_type: applog_myappapi
scan_frequency: 1s
# Multine on Timestamp, YYYY-MM-DD
# https://www.elastic.co/guide/en/beats/filebeat/master/multiline-examples.html
multiline:
pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
negate: true
match: after
max_lines: 500
timeout: 5s
# Server Stats - prospector
- input_type: log
paths:
- /myapp/logs/serverstats.log
# Exclude messages with log level
exclude_lines: [".+? ERROR[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
fields:
api: myappapi
environment: STG
ignore_older: 24h
document_type: applog_myappapi_stats
scan_frequency: 1s
# ELB prospector
-
input_type: log
paths:
- /var/log/httpd/elasticbeanstalk-access_log
document_type: elblog_myappapi
fields:
api: myappapi
environment: STG
exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
ignore_older: 24h
# 0s, it is done as often as possible. Default: 10s
scan_frequency: 1s
registry_file: /var/lib/filebeat/registry
############################# Output ##########################################
# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
# message topic selection + partitioning
topics:
- topic: 'myapp_applog'
when:
equals:
document_type: applog_myappapi
- topic: 'myapp_applog_stats'
when:
equals:
document_type: applog_myappapi_stats
- topic: 'myapp_elblog'
when:
equals:
document_type: elblog_myappapi
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000