Throttling plugin not working with lot of messages

Hi all

We use the plugin logstash-filter-throttle to reduce the number of logs of a certain type to one message per 5 minutes, to use them for some kind of health messages.

filter {
  if [labels][input_type]
    and [labels][context] == "usp"
    and [labels][env] == "prod" {

    throttle {
      before_count => -1
      after_count    => 1
      period       => 300
      max_age      => 600
      key          => "%{[labels][context]}_%{[labels][env]}_%{[labels][input_type]}"
      add_tag      => "logstash_health_throttled"
    }
    if "logstash_health_throttled" not in [tags] {
      # do something here
   }
}      

There are ~60000 messages per 5 minutes.

Now the problem is, that there are too many messages with tag "logstash_health_throttled" not set. Enabling the debug mode, I see too many messages counted as 1:

[root@lsfilter03.usp.prod ~]# grep "14:30.*usp_prod_beats.*count=>1}" /var/log/logstash.log
[2022-03-22T14:30:01,790][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955800, :timeslot=>1647955800, :count=>1}
[2022-03-22T14:30:03,131][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955799, :timeslot=>1647955500, :count=>1}
[2022-03-22T14:30:07,693][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955805, :timeslot=>1647955805, :count=>1}
[2022-03-22T14:30:07,704][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955801, :timeslot=>1647955505, :count=>1}
[2022-03-22T14:30:11,515][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955801, :timeslot=>1647955801, :count=>1}
[2022-03-22T14:30:17,861][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955815, :timeslot=>1647955815, :count=>1}
[2022-03-22T14:30:17,864][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955814, :timeslot=>1647955515, :count=>1}
[2022-03-22T14:30:22,633][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955820, :timeslot=>1647955820, :count=>1}
[2022-03-22T14:30:22,636][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955819, :timeslot=>1647955520, :count=>1}
[2022-03-22T14:30:25,990][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955823, :timeslot=>1647955823, :count=>1}
[2022-03-22T14:30:31,995][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955830, :timeslot=>1647955830, :count=>1}
[2022-03-22T14:30:31,995][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955828, :timeslot=>1647955530, :count=>1}
[2022-03-22T14:30:36,029][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955835, :timeslot=>1647955835, :count=>1}
[2022-03-22T14:30:36,030][DEBUG][logstash.filters.throttle][roles__elknode__filter][5223b55197fe4f26af2ab01a47351251af26360ff812b3beefcb37e97309c5b3] filters/LogStash::Filters::Throttle: counter incremented {:key=>"usp_prod_beats", :epoch=>1647955832, :timeslot=>1647955535, :count=>1}

If I convert the timestamp to a human readable format, I see the following output (logging timestamp epoch timeslot):

[root@lsfilter03.usp.prod ~]# grep "14:30.*usp_prod_beats.*count=>1}" /var/log/logstash.log | sed -e 's/\[DEBUG.*epoch=>/ /' -e 's/, :timeslot=>/ /' -e 's/, .*$//' | awk '{ epoch = strftime("%H:%M:%S", $2); timeslot = strftime("%H:%M:%S", $3); print $1 " " epoch " " timeslot }'
[2022-03-22T14:30:01,790] 14:30:00 14:30:00
[2022-03-22T14:30:03,131] 14:29:59 14:25:00
[2022-03-22T14:30:07,693] 14:30:05 14:30:05
[2022-03-22T14:30:07,704] 14:30:01 14:25:05
[2022-03-22T14:30:11,515] 14:30:01 14:30:01
[2022-03-22T14:30:17,861] 14:30:15 14:30:15
[2022-03-22T14:30:17,864] 14:30:14 14:25:15
[2022-03-22T14:30:22,633] 14:30:20 14:30:20
[2022-03-22T14:30:22,636] 14:30:19 14:25:20
[2022-03-22T14:30:25,990] 14:30:23 14:30:23
[2022-03-22T14:30:31,995] 14:30:30 14:30:30
[2022-03-22T14:30:31,995] 14:30:28 14:25:30
[2022-03-22T14:30:36,029] 14:30:35 14:30:35
[2022-03-22T14:30:36,030] 14:30:32 14:25:35

I have also tried to configure the plugin the other way around (tag one message every 5 minute), but that did not help.

Logstash is running with 16 pipeline workers. Could it be, that the plugin is still not threadsafe?

Could it be that the value of timeslot_cache.created is defined wrong?

Thanks for your help.

Kind regards
Elmar

Timeslot is set here. timeslot will be equal to epoch if timeslot_cache.created is equal to epoch, or, in other words, the cache entry has just been created. Are you getting any WARN messages about the cache?

There were some warnings about disappearing timeslot one hour before. But during the debugging output, there were no such messages.

Having a look at these three events (which all set the counter to 1):

[2022-03-22T14:30:01,790] 14:30:00 14:30:00
[2022-03-22T14:30:03,131] 14:29:59 14:25:00
[2022-03-22T14:30:07,693] 14:30:05 14:30:05

The first event has created the timeslot cache with 14:30:00. The third event has set the timeslot cache to 14:30:05. How can that be, when the period is set to 5 minutes?
The calculation should be:

timeslot_key = 1647955805 - (1647955805 - 1647955800) % 300 = 1647955800

But the value is 1647955805 (a new timeslot).

Any ideas?

Does the problem go away if you set pipeline.workers to 1?

Sorry for the late reply. Yes, I can confirm, that the problem does not appear, if pipeline workers is set to 1.

There has been a PR pending for 5 years to replace Atomic and ThreadSafe (long since unsupported) with Concurrent. A comment on that claims the existing plugin is broken, suggesting it is not actually thread safe. In which case you will have to run with pipeline.workers 1.

Thanks for the reply.

Would the PR improve thread safety? Having a look at these two lines:

    timeslot.update { |v| v + 1 }          # increment counter
    count = timeslot.value                    # get latest counter value
  1. Worker 1 sets timeslot value to 1.
  2. Direct afterwards worker 2 sets timeslot value to 2.
  3. Worker 1 read the value, which is now 2.
  4. Worker 2 reads the value too and also gets 2.

Wouldn't it be better to use a mutex mechanism to guarantee mutual access. For example:

public
def register
    @keycache_mutex = Mutex.new
end
public
def filter(event)
    @keycache_mutex.synchronize {
       # here comes the whole throttle code or at least the part reading
       # and updating the keycache
    }
end

What do you think about this?

timeslot would be (if the PR were merged) an AtomicFixnum. So the call to .update is thread safe, but I think you are right, to call to .value is not synchronized and might return the same value for multiple threads.

I think it is only those two lines that require a mutex, but I am not completely clear on what the cache initialization code is doing, so I could be wrong.

Hi Badger

Thanks for the reply. So you think, the code below would solve the problem?

@keycache_mutex.synchronize {  
  timeslot.update { |v| v + 1 }             # increment counter
  count = timeslot.value                    # get latest counter value
}

Do you have the possiblity to add a pull request?

Kind regards
Elmar

I do not know enough about Ruby to be sure about that.

Anyone can create a PR. Few can merge. I cannot.

Ok, no problem. I will try create a pull request.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.