Rate limit processor not working as it should

Hi.

We've been using ES for a while now and saw that some apps are producing duplicate logs we'd like to limit the rate of (not drop them entirely) and have thus started using the rate_limit processor (Rate limit the flow of events | Filebeat Reference [7.17] | Elastic).

I'm trying to limit the [Client X] messages with a processor

  processors:
    - if.regexp.message: '\[Client \d+\]'
      then.rate_limit.limit: "1/s"

and it seems to work at first, when filebeat is started, as can be seen in the logs:

2022-11-14T15:03:40.325Z        DEBUG   [publisher]     pipeline/client.go:231  Pipeline client receives callback 'onFilteredOut' for event: {Timestamp:2022-11-14 15:03:40.325690169 +0000 UTC m=+270.534639612 Meta:null Fields:{"input":{"type":"filestream"},"log":{"file":{"path":"/var/log/roslogs/5a66470a-5068-11ed-a74b-45018b8c0b2f/rosout.log"},"offset":560885},"message":"1666268758.354392051 INFO [Client 10] Subscribed to /is_workplace_selected"} Private:0xc0012ff000 TimeSeries:false}
2022-11-14T15:03:40.325Z        DEBUG   [publisher]     pipeline/client.go:231  Pipeline client receives callback 'onFilteredOut' for event: {Timestamp:2022-11-14 15:03:40.325692030 +0000 UTC m=+270.534641473 Meta:null Fields:{"input":{"type":"filestream"},"log":{"file":{"path":"/var/log/roslogs/5a66470a-5068-11ed-a74b-45018b8c0b2f/rosout.log"},"offset":561403},"message":"1666268758.362668037 INFO [Client 10] Subscribed to /is_workplace_selected"} Private:0xc0012ff580 TimeSeries:false}
2022-11-14T15:03:40.325Z        DEBUG   [publisher]     pipeline/client.go:231  Pipeline client receives callback 'onFilteredOut' for event: {Timestamp:2022-11-14 15:03:40.325793339 +0000 UTC m=+270.534742781 Meta:null Fields:{"input":{"type":"filestream"},"log":{"file":{"path":"/var/log/roslogs/5a66470a-5068-11ed-a74b-45018b8c0b2f/rosout.log"},"offset":561921},"message":"1666268758.364773988 INFO [Client 10] Subscribed to /is_workplace_selected"} Private:0xc0012ffb00 TimeSeries:false}

I assume the onFilteredOut is filtering/rate limiting out the messages as wanted, but later on, after filebeat's initial start-up, no messages get filtered, because I see several such logs in Kibana and can easily see they are not rate limited (since they are posted all within the same second for instance). An example of such log is:

2022-11-14T15:23:09.414Z        DEBUG   [processors]    processing/processors.go:128    Fail to apply processor client{rate_limit=[limit=[{1 m}],fields=[[]],algorithm=[token_bucket]], dissect=%{time} %{severity} %{node} [%{program.name}:%{program.line}(%{program.function})] [topics: %{topics}] %{msg},field=message,target_prefix=ros, dissect=%{time} %{msg},field=message,target_prefix=ros, rename=[{From:ros.msg To:msg}], dissect=ANOMALY_METADATA:%{data},field=msg,target_prefix=anomaly, decode_json_fields=anomaly.data, timestamp=[field=ros.time, target_field=@timestamp, timezone=UTC, layouts=[UNIX]]}: cannot override existing key with `ros.time`
2022-11-14T15:23:09.414Z        DEBUG   [processors]    processing/processors.go:128    Fail to apply processor client{rate_limit=[limit=[{1 m}],fields=[[]],algorithm=[token_bucket]], dissect=%{time} %{severity} %{node} [%{program.name}:%{program.line}(%{program.function})] [topics: %{topics}] %{msg},field=message,target_prefix=ros, dissect=%{time} %{msg},field=message,target_prefix=ros, rename=[{From:ros.msg To:msg}], dissect=ANOMALY_METADATA:%{data},field=msg,target_prefix=anomaly, decode_json_fields=anomaly.data, timestamp=[field=ros.time, target_field=@timestamp, timezone=UTC, layouts=[UNIX]]}: could not find beginning delimiter: `ANOMALY_METADATA:` in remaining: `[Client 127] Unsubscribed from /robot_ui/data/counters`, (offset: 0)
2022-11-14T15:23:09.414Z        DEBUG   [processors]    processing/processors.go:203    Publish event: {
  "@timestamp": "2022-11-14T15:23:02.510Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.17.4"
  },
  "message": "1668439382.510740995 INFO [Client 127] Unsubscribed from /robot_ui/data/counters",
  "input": {
    "type": "filestream"
  },
  "host": {
    "name": "robobeat"
  },
  "agent": {
    "type": "filebeat",
    "version": "7.17.4",
    "hostname": "robobeat",
    "name": "robobeat"
  },
  "ecs": {
    "version": "1.12.0"
  },
  "log": {
    "offset": 12297393,
    "file": {
      "path": "/var/log/roslogs/5a66470a-5068-11ed-a74b-45018b8c0b2f/rosout.log"
    },
    "flags": [
      "dissect_parsing_error"
    ]
  },
  "msg": "[Client 127] Unsubscribed from /robot_ui/data/counters"
}

and I get all (a few tens) of such logs, all within 1 second, but if the 1/s rate limiting would be working, I should get only 1 such log within 1 second.

We're using the 7.17 version of ES Cloud.

Does anyone have any idea why this rate limiting doesn't seem to work and/or how to fix/improve it?

P.S.
Hm, it seems if I set the rate_limit to 1/m (one message per minute, not second), the rate limiting does seem to work, and I only get 1 such message within a minute. But that's too limited.
Also, setting this to 60/m doesn't work the way I want since I'd like to filter/limit messages bursts that happen all within 1 second.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.