Hi all,
Our data pipeline uses Filebeat to send logs to Kafka. Unfortunately, every once a while the amount of data sent by Filebeat instances exceeds the amount of the data that can be handled by Kafka. Therefore, we would like to put a upper limit on the amount of data that can be sent by each Filebeat instance.
I found “bulk_max_size” & “bulk_flush_frequency” from the below doc. The way I understand these configurations is:
“bulk_max_size”: controls how many events (or lines in a log file) can be sent in one single Kafka request
“bulk_flush_frequency”: controls how often a request can be sent, in unit of seconds.
https://www.elastic.co/guide/en/beats/filebeat/master/kafka-output.html
So I set up one Filebeat container and one Kafka container for my test. The configuration of the filebeat container is listed below.
I also create the below Filebeat configuration file. In this case, I was hoping my filebeat test instance would send out one line of log every 60 seconds. However, I found my filebeat test instance always picks up all the latest lines (10 lines in my test) in the logs and send them to Kafka at the same time. In another word, the amount of data that was sent to Kafak exceeds the “bulk_max_size” specified in the configuration file.
Do I understand the meaning of “bulk_max_size” & “bulk_flush_frequency” correctly? What can I do so that I can limit the amount of data sent by Filebeat? Thanks.
## the configuration of my Filebeat container
zookeeper:
image: confluentinc/cp-zookeeper:5.1.0
container_name: zookeeper
environment: - ZOOKEEPER_CLIENT_PORT=2181
ports: - 2181:2181
kafka:
image: confluentinc/cp-kafka:5.1.0
container_name: kafka
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.154.177.6:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
ports: - 9092:9092
links: - zookeeper
filebeat:
image: docker.elastic.co/beats/filebeat:6.6.0
container_name: filebeat
user: root environment:
- strict.perms=false
volumes:
- '/Users/wo/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro'
- '/Users/wo/filebeat_logs:/usr/share/filebeat/data:rw'
- '/Users/wo/filebeat_logs/logs:/usr/share/services/logs'
# the filebeat.yml
filebeat.prospectors:
- type: log
enabled: true
tags: - execer
paths: - "/usr/share/services/logs/*"
queue.mem:
events: 4096
output.kafka:
# specifying filebeat to take timestamp and message fields, other wise it
# take the lines as json and publish to kafka
codec.format:
string: '%{[@timestamp]} %{[message]}'
hosts: ["10.154.177.255:9092"]
topic: 'foo2'
partition.round_robin:
reachable_only: false
required_acks: 1
max_message_bytes: 1000000
bulk_flush_frequency: 60
bulk_max_size: 1