Custrom aggregations for anomaly detection

Hello,

We are trying to detect anomalies in electricity usage using Elastic's anomaly detection.
To this end we would like to use pipeline aggregations such as derivative or serial differencing aggregations.
Moreover, since we will have multiple customers, we also would like to do these aggregations per customer.

In other words, we need a datafeed from which we can create a detector like "mean(derivative) by customer_id".

Our approach based on this article was to first use term aggregation on the customer_id, and then the date-histogram aggregation containing the derivative aggregation.
However, this returned an empty datafeed. Also after simplifying things a little, the term aggregation always seemed to give an empty feed.

When using the /_search API, the query works just fine.
Any ideas as to what is going wrong?

Thanks!

Jorrit

Aggregation we used:

{
  "aggregations": {
    "device_id": {
      "terms": {
        "field": "device_id"
      },
      "aggregations": {
        "buckets": {
          "date_histogram": {
            "field": "@timestamp",
            "fixed_interval": "15m",
            "time_zone": "UTC"
          },
          "aggregations": {
            "@timestamp": {
              "max": {
                "field": "@timestamp"
              }
            },
            "avg_net": {
              "avg": {
                "field": "net"
              }
            },
            "net_deriv": {
              "derivative": {
                "buckets_path": "avg_net"
          }
        }
      }
    }
  }
}

The date_histogram aggregation in your datafeed needs to be the "top" aggegation, and your terms aggregation needs to be a sub-agg of that. By default, the terms agg only returns the top 10 instances, so if your device_id distinct count is more than 10 you'll need to set the size parameter of the terms agg. I hope the cardinality of the device_id field isn't too high, however.

Hi, thank you for your reply!
We have tried that order you suggested, but ran into the issue that

derivative aggregation [net_deriv] must have a histogram, date_histogram or auto_date_histogram as parent

This error was actually the reason we swapped the order in the first place.

About the terms agg: for now 10 is sufficient, but this number could be up to 10 000. After some googling I understood that if all terms are needed, it is better to use a composite aggregation. Is that correct?

Best,

Jorrit

Aggregation:

{
  "aggregations": {
    "buckets": {
      "date_histogram": {
        "field": "@timestamp",
        "fixed_interval": "15m",
        "time_zone": "UTC"
      },
      "aggregations": {
        "@timestamp": {
          "max": {
            "field": "@timestamp"
          }
        },
        "device_id": {
          "terms": {
            "field": "device_id",
            "size": 100
          },
          "aggregations": {
            "avg_net": {
              "avg": {
                "field": "net"
              }
            },
            "net_deriv": {
              "derivative": {
                "buckets_path": "avg_net"
              }
            }
          }
        }
      }
    }
  }
}

So, yes - you will have a problem if the cardinality is ~10,000. A composite aggregation is a solution for normal queries, but the query client has to manage the pagination of results that the composite aggregation returns. The ML Datafeed (which is technically a query client) does not know how to do this pagination.

Instead - you should actually investigate Transforms, which use composite aggregations behind the scenes, but manage the whole process for you. You would use Transforms to on-the-fly create entity-centric indexes (where in this case, you'd likely group by device_id and date_histogram) and then pick metrics as the aggregations (sum, avg, etc). This would create a new entity-based, time-series index on which the ML datafeed could query (and optionally aggregate again). Since Transforms can run continuously, you would have a chain of analysis:

Raw logs --> Transform --> New Index --> ML Job --> Results

If you want this to run continuously, you would need to be careful about lagging the ML job's datafeed enough (via query_delay) in order to make sure the upstream Transform has completed its tasks in creating new records in the "New Index" before attempting to analyze them with anomaly detection.

That's exactly what we needed, thanks for your help!

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.