How to ensure the integrity of filebeat output data during network fluctuations?

The version of Filebeat I am using is 9.3.1. I set max_retries=-1, expecting that during network fluctuations, all data can be sent to Kafka. The actual test result: data is still lost during network fluctuations. I tried changing the output to console under the same conditions, and there was no data loss; the offsets in the registry continued to increase, which indicates that Filebeat collection is functioning properly. I also tried changing required_acks from -1 to 1, with output to Kafka, but data loss still occurred. 1、I want to know how to ensure that no data is lost.

#=========================== Filebeat inputs =============================

filebeat.inputs:

# 模拟高可靠场景-设备端数据同步日志

# - type: log

# enabled: false

# paths:

# - /home/eventec/data_collect/filebeatTest/high/*.log*

# exclude_files: ['.swp$']

# fields:

# mqtopic: high-reliability

# botSn: F240A18010010100

# backoff: 1s # 如果扫描时没有发现新数据,进入 backoff 模式,等待 backoff 时间后再次扫描

# backoff_factor: 1 # 每次 backoff 时间的倍数,每次等待时间 = backoff * backoff_factor

# max_backoff: 1s

# close_inactive: 10m

# # clean_inactive: 5m # 【新增】确保文件关闭 15 分钟后,强制清除其状态,释放阻塞

# idle_timeout: 1s

# scan_frequency: 10s

# # ignore_older: 2h # 【新增】忽略超过 2 小时的文件,防止处理旧日志

- type: filestream

id: high-reliability-logs

enabled: true

paths:

- /home/eventec/data_collect/filebeatTest/high/*.log*

exclude_files: ['.swp$']

fields:

mqtopic: high-reliability

botSn: F240A18010010100

backoff:

init: 1s # 对应原来的 backoff: 1s (初始等待时间)

max: 10s # 对应原来的 max_backoff: 1s (最大等待时间)

multiplier: 2 # 对应原来的 backoff_factor: 1 (倍增系数)

close_inactive: 30m

scan_frequency: 1s

take_over:

enabled: true

#============================= Queue ===============================

queue.disk:

# 队列最大占用磁盘空间 (重要:防止占满磁盘)

max_size: 10GB

#============================= Filebeat modules ===============================

# filebeat.registry.path: /home/eventec/fanos/filebeat-data/registry

filebeat.config.modules:

# Glob pattern for configuration loading

path: ${path.config}/modules.d/*.yml

# Set to true to enable config reloading

reload.enabled: false

# Period on which files under path should be checked for changes

#reload.period: 10s

#----------------------------- Kafka output --------------------------------------

output.kafka:

enabled: true

# High reliability settings - retry until successful

max_retries: -1 # Effectively infinite retries

retry_backoff: 1s

max_retry_backoff: 10s

# Use acks=all for maximum reliability

required_acks: -1

hosts: [""]

topic: '%{[fields][mqtopic]}'

compression: gzip

username: "admin"

password: "N/A"

sasl:

enabled: true

mechanism: plain

username: "admin"

password: "N/A"

key: '%{[fields.principalId]}'

partition.hash:

reachable_only: true

# output.console:

# pretty: true

#================================ Processors =====================================

# Configure processors to enhance or manipulate events generated by the beat.

processors:

- add_host_metadata: ~

- add_cloud_metadata: ~

#============================== HTTP ============================

http:

enabled: true

port: 5066

#============================== log ============================

logging:

level: debug

metrics:

period: 1s

2、I want to know why output.events has received 108 acked (all my data) but there are still 108 pieces of data in pipeline.queue.