So, my filebeat running in the kubernetes cluster as daemonsets. and filebeat config:
logging.level: info
path.home: "/usr/share/filebeat"
path.config: "/usr/share/filebeat"
path.data: "/usr/share/filebeat/data"
path.logs: "/usr/share/filebeat/logs"
filebeat.autodiscover:
providers:
- type: kubernetes
templates:
- condition:
has_fields: ["kubernetes.labels.kafkaTopic"]
config:
- type: log
enabled: true
paths:
- /data/logs/${data.kubernetes.labels.service}-${data.kubernetes.labels.cluster}_${data.kubernetes.namespace}/${data.kubernetes.pod.name}/*/*.log
- type: log
enabled: true
symlinks: true
json.keys_under_root: false
paths:
- /var/log/pods/${data.kubernetes.namespace}_${data.kubernetes.pod.name}_${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log
processors:
- rename:
fields:
- from: "json.log"
to: "message"
- from: "json.stream"
to: "stream"
- from: "json.time"
to: "datetime"
ignore_missing: false
fail_on_error: true
- drop_fields:
fields: ["json"]
processors:
- if:
regexp:
message: "{.*}"
then:
- rename:
fields:
- from: "message"
to: "message_json_str"
ignore_missing: false
fail_on_error: true
- decode_json_fields:
fields: ["message_json_str"]
process_array: true
max_depth: 5
target: ""
overwrite_keys: false
add_error_key: true
- drop_fields:
fields: ["message_json_str"]
- rename:
fields:
- from: "log.file.path"
to: "log_path"
- from: "kubernetes.replicaset.name"
to: "kubernetes.replicaset_name"
- from: "kubernetes.pod.name"
to: "kubernetes.pod_name"
- from: "kubernetes.node.name"
to: "kubernetes.node_name"
- from: "host.name"
to: "fagent"
ignore_missing: false
fail_on_error: true
- drop_fields:
fields:
- "kubernetes.container"
- "kubernetes.replicaset"
- "kubernetes.pod"
- "kubernetes.node"
- "kubernetes.labels.pod-template-hash"
- "agent"
- "ecs"
- "log"
- "input"
- "host"
output.kafka:
enabled: true
hosts: '${KAFKA_HOSTS}'
topic: "%{[kubernetes.labels.kafkaTopic]}"
partition.round_robin:
reachable_only: true
required_acks: 1
compression: gzip
max_message_bytes: 1000000
channel_buffer_size: 1024
keep_alive: 60
client_id: ${HOSTNAME:beats}
worker: 3
Now I test write "test" into log file use cmd "echo test >> test.log", kafka topic received eight count same log, I do not know what happened, anyone can help you? thanks.