How to query for anomalous state changes based on duration

I'm trying to determine if there is a way that I can query an Elasticsearch index for a particular data anomaly we sometimes encounter.

Here is a contrived example of the dataset (with some annotations):

[
  {
    "_id": 1,
    "asset": "foo",
    "recorded_date": "2020-09-11T00:00:00.000Z",     # start the "clock"
    "recorded_ts": 1599782400,
    "state": "on"
  },
  {
    "_id": 2,
    "asset": "foo",
    "recorded_date": "2020-09-11T02:00:00.000Z",     # 2 hours <= 3 hours (ignored)
    "recorded_ts": 1599789600,
    "state": "off"
  },
  {
    "_id": 3,
    "asset": "foo",
    "recorded_date": "2020-09-11T04:00:00.000Z",     # start the "clock"
    "recorded_ts": 1599796800,
    "state": "on"
  },
  {
    "_id": 4,
    "asset": "baz",
    "recorded_date": "2020-09-11T08:30:00.000Z",     # baz != foo
    "recorded_ts": 1599813000,
    "state": "off"
  },
  {
    "_id": 5,
    "asset": "foo",
    "recorded_date": "2020-09-11T14:30:00.000Z",     # 10.5 hours > 3 hours (anomaly detected!)
    "recorded_ts": 1599834600,
    "state": "off"
  },
  {
    "_id": 6,
    "asset": "foo",
    "recorded_date": "2020-09-11T20:00:00.000Z",     # immediately-preceding foo state != on (ignored)
    "recorded_ts": 1599854400,
    "state": "booting"
  },
  {
    "_id": 7,
    "asset": "foo",
    "recorded_date": "2020-09-11T23:30:00.000Z",     # immediately-preceding foo state != on (ignored)
    "recorded_ts": 1599867000,
    "state": "off"
  }
]

In a nutshell, I want to count how frequently during a specific time range (e.g. 2020-09-01T00:00:00Z to 2020-09-02T00:00:00Z) a particular asset (e.g. foo) has a state equal to on for more than 3 hours.

(Imagine foo as a widget that could potentially overheat if it is on longer than 3 hours.)

Given the aforementioned sample dataset, the query would detect 1 anomaly for asset foo.

In thinking about the problem space I've come up with some criteria in an attempt to help describe what constitutes an anomaly:

  1. The current document must not have a state equal to on

  2. The current document must have a sequentially preceding document within the same recorded_ts period with the same asset value
    a. The preceding document and the current document could potentially be non-consecutive in order (i.e. records for other assets interleaved in the dataset)
    b. Any preceding document outside of the recorded_ts period bounds should simply be ignored, and not count towards any anomaly counters

  3. The time difference between the recorded_ts fields of the current document and the preceding document must equate to more than 3 hours

It feels like a "windowing operation" to me, partitioned by asset and ordered by recorded_ts. However, I'm not sure if there is a way to achieve this with the standard DSL. (Perhaps it can be solved with a histogram and aggs?) In reading the docs I'm not able to wrap my mind around how to compose the query.

Apparently, I'm a bit "temporarily-challenged" so hopefully someone in the community who enjoys puzzles can help shed some light? :sunny:

Thank you!

I think you can solve this by using a combination of query and aggregation:

  • range filter to narrow search to your time range
  • sorted descending by recorded_ts, if you have 1 shard
  • a composite aggregation with source (conceptually a group_by/pivot) by asset
    • a scripted metric aggregation as part of the composite aggregation to implement your custom logic
    • if you have more than 1 shard you need to collect all the required data and implement everything in the reducer phase, because you need all data. If you have 1 shard it's easier, you can implement everything in the mapper and your 1st document is the last/current state, so you can drop incomplete sessions quickly. For more than 1 shard you need to sort again, before you start analyzing

The above requires painless skills as in painless, the scripting language(being painless helps, too).

If you want to do such an analyzes regularly or you expect to do several iterations, it might be useful to use a transform, e.g. you can continuously pivot your source by asset, so you have all data about it in 1 document. Think of it as a the counterpart of a materialized view in SQL. You can even combine transform and watcher or transform and ML.

Last but not least, you might want to checkout EQL, I am not sure you can solve your problem with EQL as of now, but as we improve EQL, this might be a solution long term.

2 Likes

@Hendrik_Muhs Thank you for the detailed reply! This definitely helps me narrow down some options. :slight_smile:

It's possible that the filter on recorded_ts could span multiple shards. (I have an ILM policy set to rollover weekly or if shard gets to ~50 GB in size, which occasionally happens.) So, it sounds like I should take a closer look at painless implement everything in the reducer phase.

Again , thank you!