Logstash filter basted on log file age

Hello.
I am running ELK 6.6 on a CentOS 7 box. I have filebeat configured on a Windows machine to forward specific logs to Logstash.
The problem I have is that my Logstash filter has a throttling mechanism setup as to not overwhelm the ELK box when live data is being shipped on a production system. However, if (and this happened) filebeat was down, then when restarting it is expected to ship the old files that it hadn't. Filebeat does do this but the problem is that it processes the logfile very quickly ( ~ 3280 KB) and is all shipped within the throttling period which causes it to drop the majority of the logs after the 1002nd. The logfiles use a logfile_%{YYYYMMDD}.log naming scheme.
Is there a way for me to say "if the logfile is older than 2 days use this throttling filter instead"?

Here is my current throttle filter for Logstash (note that this is not the whole throttle filter but the relevant part):

    filter {
        throttle {
          period => 30
          max_age => 60
          after_count => 1000
          key => "%{host}"
          add_tag => "throttled"
        }
        if "throttled" in [tags] {
          throttle {
            period => 60
            max_age => 120
            after_count => 2
            key => "%{host}"
            add_tag => "drop"
          }
        }
        if "drop" in [tags] {
          drop { }
        }

Any help would be greatly appreciated.
Thank you

Assuming that @timestamp is set to the original date of the log you could do something like

ruby { code => 'event.set("[@metadata][age]", Time.now.to_f - event.timestamp.to_f)' }
mutate { convert => { "[@metadata][age]" => "integer" } }
if 172800 < [@metadata][age] {
     # Alternate throttling code
} else {
     # existing throttling
}

Thanks for the quick reply Badger! (Bader and Badger will confuse lots of people)
What about the case where @timestamp is the time that an event is logged to Elasticsearch and the original timestamp (at the beginning of every line of my logfile) is matched using a grok filter?

If you grok the original timestamp into a field, you can parse that field using a date filter, and use the target option of the date filter to write a timestamp to (for example) "[@metadata][original_ts]". Once that is done you can change the ruby filter to

ruby { code => 'event.set("[@metadata][age]", Time.now.to_f - event.get("[@metadata][original_ts]").to_f)' }
1 Like

Thank you very much for your help!
I am going to do some extensive testing on this, but the initial results show this being the solution.
Really appreciate the quick help!

Will flag as solved unless otherwise found out later on. Thanks again!

Would you know whether this same filter can be applied to Logstash v. 5.4? For some reason it doesn't seem to work with Logstash 5.4. Would it maybe have a different syntax/missing extension?

I would expect it to work the same way in v5.4. For anything older than 5.0 the way you access fields of the event is different, but for 5.4 it should be the same.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.