Hello, community!
My colleague's struggling with some issue with filebeat in his project for 5 months, so every engineer in my team decided to help him.
He's using filebeat 7.14.2 to collect logs and send it to kafka.
Configuration is similar as below.
- Input
- type: filestream
enabled: true
close:
on_state_change:
inactive: 1m
removed: true
clean_inactive: 1h
clean_removed: true
paths:
- /path/to/logs/**/2022*
- filebeat.yml
filebeat.config.inputs:
enabled: true
path: ${path.config}/inputs.d/*.yml
reload:
enabled: true
period: 60s
filebeat.registry.flush: 5s
max_procs: 2
tags: ["CHANNEL"]
output.kafka:
enabled: true
hosts: [
"KAFKA_HOST:KAFKA_PORT"
]
topic: "test-topic"
required_acks: 1
partition.round_robin:
reachable: false
max_message_bytes: 9000000
compression: snappy
processors:
- decode_json_fields:
fields: ['message']
target: ""
- drop_fields:
fields: ['message']
logging.level: info
Symptoms are like below.
- Memory usage of filebeat process is keep increasing.
- Takes too much time writing {6 digit number}.json file in registry path when log.json reaches 11MB. (Because {6 digit number}.json file size is getting bigger and bigger when created.)
- Found states of removed files in every {6 digit number}.json file created in registry path.
- Every TTL of removed files was 0.
To find a cause, another engineer looked up filebeat codes in github repository and tested himself.
He said GC of removed files was not working because "resource" of removed files were considered not finished.
Here's a snippet of the codes.
// checkCleanResource returns true for a key-value pair is assumed to be old,
// if is not in use and there are no more pending updates that still need to be
// written to the persistent store anymore.
func checkCleanResource(started, now time.Time, resource *resource) bool {
if !resource.Finished() {
return false
}
resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()
ttl := resource.internalState.TTL
reference := resource.internalState.Updated
if started.After(reference) {
reference = started
}
return reference.Add(ttl).Before(now) && resource.stored
}
And here's how filebeat checks if resource is finished.
// Finished returns true if the resource is not in use and if there are no pending updates
// that still need to be written to the registry.
func (r *resource) Finished() bool { return r.pending.Load() == 0 }
Engineer who tested told me a value of r.pending.Load() of removed files increases to 2 but never decreases to 0 and that's why memory keeps increasing because resource won't be released in memory store.
Is there any way to clean states of removed files and lower memory usage?
Thanks in advance.