Well, there's probably many ways this could be solved, but here's one approach - see example
- Run once per day, look over the last 24 hours (the
rangein the example needs to be modified here because the example uses old data, not live data) - Filter your query by
job_idandresult_type:record - Do a
termsaggregation on the partition field - Do a
date_histogramsub-aggregation with an interval that matches thebucket_spanof the job (the example shown had a1mbucket span due to the data set being used in order to have consecutive anomalous buckets, so you would need to change to15m) - Use the
moving_fnaggregation to invoke a 3 bucket sliding window sum of therecord_score - Use a
bucket_selectoraggregation to eliminate any individual values where therecord_scoreis below some arbitrary value (I chose40). - The
conditionscript loops through and finds if any 3 bucket sliding window sum of therecord_scoreis greater than some arbitrary value (I chose120) - In the
actionssection, gather up all of the partitions that violated the threshold and print them with the latest timestamp at which they violated (obviously use your preferred action method)
An example output is:
Anomalies:
==========
AAL had 3 anomalies in a row at 2021-02-10T12:32:00.000Z
AWE had 3 anomalies in a row at 2021-02-10T19:19:00.000Z
AMX had 3 anomalies in a row at 2021-02-10T22:10:00.000Z