Forcing Filebeat to drop events when files deleted

I'm trying to use Filebeat to ingest our rsyslogd logfiles and send them to logstash. Unfortunately we can send quite a large volume of logs sometimes, and have relatively small disks. We have logrotate configured to rotate files when they reach 50M size, and keep only one old logfile.

For example, we will have at one time

  • /var/log/messages.json (currently being written to rsyslogd)
  • /var/log/messages.json.1 (rotated away by logrotate)

When the next rotation occurs, /var/log/messages.json.1 is briefly renamed to /var/log/messages.json.2 and then unlinked shortly afterwards.

If filebeat is still reading events from /var/log/messages.json.2 when it is unlinked by logrotate, I want those events to be dropped - it is preferable to holding the file open and running the risk of filling the disk. I've experimented with setting the close_renamed and close_removed options, but I've not had any luck. It seems filebeat will not actually try to notice the removed or deleted status of a file until it hits the EOF condition on it?

Is it possible to beat Filebeat into submission here?

Here is my config:

filebeat.prospectors:
  - input_type: log
    paths:
      - /var/log/messages.json
      - /var/log/messages.json.*
    document_type: app_log
    scan_frequency: 10s
    clean_removed: true
    close_removed: true
    close_renamed: true
    encoding: utf-8
    json.keys_under_root: true
    tags: [ "app_log" ]
logging.level: warning
logging.to_syslog: true
logging.to_files: false
output.logstash:
  hosts: [ ... ]
tags: [ ... ]

Here is some debug output when rotating away the file

2017/05/22 05:29:58.655922 spooler.go:89: DBG  Flushing spooler because of timeout. Events flushed: 0
2017/05/22 05:30:03.043466 metrics.go:34: INFO No non-zero metrics in the last 30s
2017/05/22 05:30:03.145456 prospector.go:183: DBG  Run prospector
2017/05/22 05:30:03.145532 prospector_log.go:70: DBG  Start next scan
2017/05/22 05:30:03.145573 prospector_log.go:91: DBG  Prospector states cleaned up. Before: 0, After: 0
2017/05/22 05:30:03.656172 spooler.go:89: DBG  Flushing spooler because of timeout. Events flushed: 0
----------- ROTATION HAPPENS HERE --------
2017/05/22 05:30:06.686754 prospector.go:183: DBG  Run prospector
2017/05/22 05:30:06.686803 prospector_log.go:70: DBG  Start next scan
2017/05/22 05:30:06.686883 prospector_log.go:226: DBG  Check file for harvesting: /var/log/messages.json
2017/05/22 05:30:06.686896 prospector_log.go:259: DBG  Update existing file for harvesting: /var/log/messages.json, offset: 31864380
2017/05/22 05:30:06.686900 prospector_log.go:268: DBG  Resuming harvesting of file: /var/log/messages.json, offset: 31864380
2017/05/22 05:30:06.689360 log.go:283: DBG  Set previous offset for file: /var/log/messages.json. Offset: 31864380 
2017/05/22 05:30:06.689409 log.go:273: DBG  Setting offset for file: /var/log/messages.json. Offset: 31864380 
2017/05/22 05:30:06.689434 prospector_log.go:91: DBG  Prospector states cleaned up. Before: 3, After: 3
2017/05/22 05:30:06.689485 log.go:91: INFO Harvester started for file: /var/log/messages.json
2017/05/22 05:30:06.794702 spooler.go:119: DBG  Flushing spooler because spooler full. Events flushed: 2048
2017/05/22 05:30:06.936517 spooler.go:119: DBG  Flushing spooler because spooler full. Events flushed: 2048
2017/05/22 05:30:06.974032 sync.go:85: ERR Failed to publish events caused by: write tcp 172.18.0.9:60376->172.18.0.48:5044: write: connection reset 
2017/05/22 05:30:06.974058 single.go:91: INFO Error publishing events (retrying): write tcp 172.18.0.9:60376->172.18.0.48:5044: write: connection 
2017/05/22 05:30:07.048352 spooler.go:119: DBG  Flushing spooler because spooler full. Events flushed: 2048
2017/05/22 05:30:08.916360 registrar.go:275: DBG  Processing 2048 events
2017/05/22 05:30:08.919833 registrar.go:261: DBG  Registrar states cleaned up. Before: 22, After: 22
2017/05/22 05:30:08.919878 registrar.go:298: DBG  Write registry file: /var/lib/filebeat/registry
2017/05/22 05:30:08.921764 registrar.go:323: DBG  Registry file updated. 22 states written.
2017/05/22 05:30:09.096856 spooler.go:119: DBG  Flushing spooler because spooler full. Events flushed: 2048
2017/05/22 05:30:10.444011 registrar.go:275: DBG  Processing 2048 events
2017/05/22 05:30:10.445404 registrar.go:261: DBG  Registrar states cleaned up. Before: 22, After: 22
2017/05/22 05:30:10.445421 registrar.go:298: DBG  Write registry file: /var/lib/filebeat/registry
2017/05/22 05:30:10.448441 registrar.go:323: DBG  Registry file updated. 22 states written.
2017/05/22 05:30:10.594262 spooler.go:119: DBG  Flushing spooler because spooler full. Events flushed: 2048

2017/05/22 05:30:11.927514 registrar.go:275: DBG  Processing 2048 events
2017/05/22 05:30:11.933515 registrar.go:261: DBG  Registrar states cleaned up. Before: 22, After: 22
2017/05/22 05:30:11.933543 registrar.go:298: DBG  Write registry file: /var/lib/filebeat/registry
2017/05/22 05:30:11.951635 registrar.go:323: DBG  Registry file updated. 22 states written.

2017/05/22 05:30:12.155687 spooler.go:119: DBG  Flushing spooler because spooler full. Events flushed: 2048
2017/05/22 05:30:12.829729 registrar.go:275: DBG  Processing 2048 events
2017/05/22 05:30:12.835411 registrar.go:261: DBG  Registrar states cleaned up. Before: 22, After: 22
2017/05/22 05:30:12.835433 registrar.go:298: DBG  Write registry file: /var/lib/filebeat/registry
2017/05/22 05:30:12.837879 registrar.go:323: DBG  Registry file updated. 22 states written.
2017/05/22 05:30:12.986000 spooler.go:119: DBG  Flushing spooler because spooler full. Events flushed: 2048
2017/05/22 05:30:13.145740 prospector.go:183: DBG  Run prospector
2017/05/22 05:30:13.145806 prospector_log.go:70: DBG  Start next scan
2017/05/22 05:30:13.145840 prospector_log.go:91: DBG  Prospector states cleaned up. Before: 0, After: 0
2017/05/22 05:30:13.992171 registrar.go:275: DBG  Processing 2048 events
2017/05/22 05:30:13.993927 registrar.go:261: DBG  Registrar states cleaned up. Before: 22, After: 22
2017/05/22 05:30:13.993955 registrar.go:298: DBG  Write registry file: /var/lib/filebeat/registry
2017/05/22 05:30:13.994847 registrar.go:323: DBG  Registry file updated. 22 states written.
2017/05/22 05:30:14.181343 spooler.go:119: DBG  Flushing spooler because spooler full. Events flushed: 2048

What is happening in your case I think is that the file is closed and reopend after renaming as the harvester detects it again. This makes sense as it is in your config: /var/log/messages.json.*. So instead you should have /var/log/messages.json.1 here so the other files are ignored.

Sure - but that means the filebeat config needs to be changed when the
logrotate config does. What does close_removed do if not close files that
have been unlinked by some other process?

What I ended up doing is adding a postrotate script to my logrotate config
to truncate the .2 files before they get unlinked. As a bonus extra it also
checks the size of the file against the position stored in the filebeat
registry so we can get alerted when log messages are dropped. When I get to
the office this morning I'll post up the script.

Would be a good feature for filebeat to have to manage this process itself
& provide some metric for how much logfile was deleted before being sent!

Why does the config has to be changed when logrotate is changed? close_removed closes the file. In the above the relevant is close_renamed. It closes the file but then finds it again and opens it again.

TBH I don't fully understand yet why you need this special magic? Perhaps the script will give me the missing details.

As in, if the number of log files retained by logrotate changes, the filebeat config would need a matching change (to not match the .n+1 file).

This is what I've ended up doing. As a bonus, it also writes a metric to prometheus (using the textfile exporter) about how much log data was truncated.

#!/usr/bin/env ruby

require 'json'
require 'trollop'

@opts = Trollop::options do
    banner "Truncate files that logrotate has rotated away"
    opt :glob, "Logrotate pattern from the stanza", :type => :string, :required => true
    opt :maxrotate, "Maximum number of rotated (.foo) files to keep", :type => :int, :required => true
    opt :filebeatdb, "Location of filebeat database json file", :type => :string, :default => "/var/lib/filebeat/registry"
    opt :promcollectdir, "Location of directory for prometheus textfile collector", :type => :string, :default => "/var/lib/prometheus-textfile/"
end

puts "Running with glob #{@opts[:glob]} and maxrotate #{@opts[:maxrotate]}"

@filebeat_registry = JSON.parse(File.read(@opts[:filebeatdb]))

def truncate_and_return_fb_pos(fname)
    begin
        filebeat_pos = nil
        file_size = nil

        File.open(fname, File::WRONLY) do |f|
            stat_info = f.stat
            file_size = stat_info.size

            filebeat_entry = @filebeat_registry.find do |entry|
                entry['FileStateOS']['inode'] == stat_info.ino &&
                    entry['FileStateOS']['device'] == stat_info.dev
            end
            if filebeat_entry
                puts "File #{fname} is #{stat_info.size} bytes, filebeat has read #{filebeat_entry['offset']} bytes"
                filebeat_pos = filebeat_entry['offset']
            else
                puts "File #{fname} is not tracked by filebeat (or is no longer) - not writing any metrics."
            end

            f.truncate 0
        end

        return [filebeat_pos, file_size]
    rescue Errno::ENOENT
        puts "File #{fname} appears to have been unlinked before truncation. Continuing..."
        return [nil, 0]
    end
end

def write_metrics_for_logfile(fname:, pattern:, filebeat_pos:, file_size:)
    # We only write two metrics - filebeat_pos - file_size as "filebeat_logrotate_bytes_dropped", and
    # file_size as "filebeat_logrotate_bytes_truncated". The latter is really only so we know this script
    # is actually doing something in production and tells us very little else that is useful.
    # This is very un-prometheus - in theory, we would track the number of bytes written to logfiles
    # and the bytes read by filebeat as separate metrics, and subtract them in a query.
    # However, we can't know how many bytes filebeat has read from in here, because it might have already
    # closed and forgotten the logfiles before we try to truncate them here.
    # We can't even really measure how much logfile has been written from this script as-is, because we are
    # only looking at files that are about to be unlinked - we'll always be lagging up to 50M behind the
    # real metric.

    metric_fname = File.join(@opts[:promcollectdir], pattern.gsub('/', '__') + ".prom")
    File.open(metric_fname, File::RDWR | File::CREAT) do |f|
        f.flock(File::LOCK_EX)
        # Read current counter values
        current_text = f.read

        filebeat_logrotate_bytes_dropped_current = current_text.lines.find { |l|
            l.start_with? 'filebeat_logrotate_bytes_dropped'
        }&.match(/^filebeat_logrotate_bytes_dropped{.*}\s+([0-9]+)\s+[0-9]+$/)&.to_a&.at(1)&.to_i
        puts "Current value of filebeat_logrotate_bytes_dropped_current: #{filebeat_logrotate_bytes_dropped_current}"
        filebeat_logrotate_bytes_dropped_current = 0 if filebeat_logrotate_bytes_dropped_current.nil?

        filebeat_logrotate_bytes_truncated_current = current_text.lines.find { |l|
            l.start_with? 'filebeat_logrotate_bytes_truncated'
        }&.match(/^filebeat_logrotate_bytes_truncated{.*}\s+([0-9]+)\s+[0-9]+$/)&.to_a&.at(1)&.to_i
        filebeat_logrotate_bytes_truncated_current = 0 if filebeat_logrotate_bytes_truncated_current.nil?

        f.truncate 0
        f.seek 0

        filebeat_logrotate_bytes_dropped_current += (file_size - filebeat_pos) unless filebeat_pos.nil?
        filebeat_logrotate_bytes_truncated_current += file_size
        timestamp = Time.now.to_i * 1000 # in ms

        f.write "# HELP filebeat_logrotate_bytes_dropped The number of bytes of log that filebeat did not see before truncation\n"
        f.write "# TYPE filebeat_logrotate_bytes_dropped counter\n"
        f.write "filebeat_logrotate_bytes_dropped{pattern=\"#{pattern.gsub('"', '_')}\"} #{filebeat_logrotate_bytes_dropped_current} #{timestamp}\n"

        f.write "# HELP filebeat_logrotate_bytes_truncated The number of bytes of log that have been truncated away\n"
        f.write "# TYPE filebeat_logrotate_bytes_truncated counter\n"
        f.write "filebeat_logrotate_bytes_truncated{pattern=\"#{pattern.gsub('"', '_')}\"} #{filebeat_logrotate_bytes_truncated_current} #{timestamp}\n"
    end
end

# Note that we get space-separated globs from logrotate - expand that into indivudual stanzas
glob_patterns = @opts[:glob].split(/ |\n/)
puts "Glob patterns broken out as: #{glob_patterns.inspect}"
glob_patterns.each do |pattern|
    # Look for the input lograte pattern + the numeric suffix
    Dir.glob(pattern + '.*').each do |fname|
        m = /\.([0-9]+)$/.match(fname)
        next if m.nil? # Not actually a .numeric suffix

        suffix_num = m[1].to_i
        if suffix_num > @opts[:maxrotate]
            puts "Found file #{fname} with suffix \##{suffix_num} > #{@opts[:maxrotate]} - truncating it."

            filebeat_pos, file_size = truncate_and_return_fb_pos fname
            write_metrics_for_logfile(fname: fname, pattern: pattern, filebeat_pos: filebeat_pos, file_size: file_size)
        else
            puts "Found file #{fname} with allowable suffix - not truncating."
        end
    end
end

Glad you found a solution and thanks for sharing the script. I see the value of being able to see how much is processed of a file or how much filebeat lags behind, especially under high load. Perhaps we can have something like that as info centralised monitoring in the future. It does not really fit as part of the events in filebeat. On thing that could be done is adding the "size" of the file to each event. It would require to check the file every time which is kind of bad but it would allow to define in the moment the event was catched how much filebeat was behind. This then could be use to do some diff calculations in Kibana. This meta info is similar to what we have here: https://github.com/elastic/beats/pull/2279

Perhaps the most appropriate way to expose that data is some kind of
metricbeat integration. We're not really using elastic for metrics, but I
think metricbeat is flexible enough to plug into other workflows.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.