I'm using Filebeat 7.16.0 with Kafka Input and Elasticsearch Output.
Previously I had java application to take care of this flow.
I'm using the same consumer group as previous java application and messages appear in Elasticsearch, but the topic offset stays the same. It's an issue, because it's on Kubernetes and if the pod will be restarted, the filebeat will duplicate the messages which it previously processed.
Current state of topic:
PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
11 5986 6399 413
4 6090 6497 407
7 6195 6551 356
1 6045 6468 423
5 6177 6593 416
0 6074 6485 411
9 5977 6414 437
8 6194 6562 368
2 5991 6397 406
6 6160 6558 398
10 6055 6466 411
3 6028 6461 433
filebeat.yml:
filebeat.shutdown_timeout: 300s
http.enabled: true
logging:
level: warn
filebeat.inputs:
- type: kafka
hosts: kafka-host:19090
topics: kafka-topic
group_id: kafka-consumer-group
sasl.mechanism: PLAIN
username: username
password: password
output.elasticsearch:
hosts: https://elastic-host:443
username: username
password: password
bulk_max_size: 25
compression_level: 9
index: index
processors:
- decode_json_fields:
fields: message
target: ""
overwrite_keys: true
setup.ilm.enabled: true
setup.ilm.policy_name: index
setup.ilm.rollover_alias: index
setup.ilm.policy_file: /etc/ilm-policy.json
setup.ilm.overwrite: true
setup.template.name: index
setup.template.pattern: index-*
setup.template.fields: /etc/fields.yml
setup.template.overwrite: true
setup.template.append_fields:
- name: "@timestamp"
type: date