Referencing field name from datafeed aggregation to use as a detector in an ML job

I created a machine learning job which uses two metrics as detectors defined as aggregations of fields in my index and everything was working great (Job_0, Datafeed_0 code below).

What I'd really like to do though is define each of those metrics based on a filter unique to each metric. However if I try to filter those fields, I seem to lose the ability to refer back to the results as field names in the job itself.

How can I create two metrics, filtered based on different criteria, and then still reference them as detectors in the ML job?

Job_0: No errors regarding referencing metric1 and metric2 as detectors:

{
  "description": "My Job",
  "analysis_config": {
    "bucket_span": "1h",
    "detectors": [
      {
        "detector_description": "Metric 1",
        "function": "mean",
        "field_name": "metric1"
      },
      {
        "detector_description": "Metric 2",
        "function": "mean",
        "field_name": "metric2"
      }
    ],
    "summary_count_field_name": "doc_count"
  },
  "model_plot_config": {"enabled": "true"},
  "data_description": {"time_field": "ts"}
}

Datafeed_0:

{
  "job_id": "my_job_id",
  "indices": ["my_index"],
  "aggs": {
    "buckets": {
      "date_histogram": {
        "field": "ts",
        "interval": "1h",
        "time_zone": "UTC"
      },
      "aggs": {
        "ts": {"max": {"field": "ts"}},
        "metric1":{"value_count":{"field":"my_field"}},
        "metric2":{"sum":{"field":"my_other_field"}}
      }
    }
  }
}

Result:

{
    "metric1_value": 3509,
    "metric2_value": 58613,
    "ts": 1544425187000,
    "doc_count": 3509
}

Even though I can run this query, and it returns the data I want, I can't seem to insert it into the datafeed:

{
  "aggs": {
    "ts": {"max": {"field": "ts"}},
    "metric1": {
        "filter":{"term": {"my_filter_field": "A" } },
        "aggs" : {
            "metric1_value" : { "value_count" : { "field" : "my_field" } }
        }
    },
    "metric2": {
        "filter":{"term": {"my_filter_field": "B" } },
        "aggs" : {
            "metric2_value" : { "value_count" : { "field" : "my_other_field" } }
        }
    }
  }
} 

When I try that in the Datafeed_1 on Job_0 I get:

  {
    "type": "illegal_argument_exception",
    "reason": "Unsupported aggregation type [metric1]"
  }

Datafeed_1:

{
  "job_id": "my_job_id",
  "indices": ["my_index"],
  "aggs": {
    "buckets": {
      "date_histogram": {
        "field": "ts",
        "interval": "1h",
        "time_zone": "UTC"
      },
      "aggs": {
        "ts": {"max": {"field": "ts"}},
        "metric1":{
          "filter":{"term":{"my_filter_field":"A"}},
          "aggs":{
            "metric1_value":{"value_count":{"field":"my_field"}}
          }
        },
        "metric2":{
          "filter":{"term":{"my_filter_field":"B"}},
          "aggs":{
            "metric2_value":{"sum":{"field":"my_other_field"}}
          }
        }
      }
    }
  }
}

If I try to reference the field name inside the aggregation ("metric1_value") instead, I don't get any error, and the job runs, but the metric value is nowhere to be found in the results:

  {
    "ts": 1544425187000,
    "doc_count": 3509
  }

How can I filter two separate metrics in different ways, but then access the inner level of the result document that actually has the values and pass that to the detector field? If the metrics are like "ts" below, I can just reference "ts" and it works. How do I reference "metric1_value"?

  "aggregations": {
    "metric1": {
      "doc_count": 0,
      "metric1_value": {
        "value": 0
      }
    },
    "metric2": {
      "doc_count": 0,
      "metric2_value": {
        "value": 0
      }
    },
    "ts": {
      "value": 1544658335000,
      "value_as_string": "1544658335"
    }
  }

The job is looking at metric1 for its data, which is a filter aggregation. I think you want it to look at metric1_value, which is the value count aggregation inside the filter. Similarly for metric2.

The new job config should probably be:

{
  "description": "My Job",
  "analysis_config": {
    "bucket_span": "1h",
    "detectors": [
      {
        "detector_description": "Metric 1",
        "function": "mean",
        "field_name": "metric1_value"
      },
      {
        "detector_description": "Metric 2",
        "function": "mean",
        "field_name": "metric2_value"
      }
    ],
    "summary_count_field_name": "doc_count"
  },
  "model_plot_config": {"enabled": "true"},
  "data_description": {"time_field": "ts"}
}

I tested on some other data to verify, I was able to replicate your error, but then it was rectified when I adjusted by aggregation names (or the job detector names).

BenTrent - thanks for the response.

I concur that the error is eliminated when I make the changes you suggested, but even with the error resolved, the actual results are still problematic.

If metric1 is just a single value aggregation like this:

     "metric1":{"value_count":{"field":"my_field"}},

Then when I run the machine learning job preview, I see that value, labeled "metric1" in the result.

GET _xpack/ml/datafeeds/my_job_id/_preview

  {
    "metric1": 0,
    "ts": 1544425187000,
    "doc_count": 3509
  }

However, when I make the changes you suggested, I don't get an error, but I also don't get a value for the metric coming from the preview. New metric1 aggregation:

"metric1":{
  "filter":{"term":{"my_filter_field":"A"}},
  "aggs":{
    "metric1_value":{"value_count":{"field":"my_filter_field"}}
  }

Referencing metric1_value in the job:

"detectors": [
  {
    "detector_description": "Metric 1",
    "function": "mean",
    "field_name": "metric1_value"
  }
],

No errors, but no field in the preview labeled "metric1_value" it's like it's gone:

  {
    "ts": 1544425187000,
    "doc_count": 3509
  }

When I open the ML job, it's just blank. Also, I can run just the aggregation query, and it returns results, so there are results to return it's not a "null" result or something:

GET my_index-*/_search
{
  "size":0,
  "aggs": {
    "metric1":{
      "filter":{"term":{"my_filter_field":"A"}},
      "aggs":{
        "metric1_value":{"value_count":{"field":"my_filter_field"}}
      }
    }
  }
} 

Returns:

  "hits": {
    "total": 20577359,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "metric1": {
      "doc_count": 2137037,
      "metric1_value": {
        "value": 2137037
      }
    }
  }

You are absolutely correct, sorry about that :(. I just looked at the source code and it seems that we ignore single_bucket aggregations. So, the filter aggregation is not even searched for sub values.

So, it seems the two solutions right now are:

  • Create two jobs that have datafeeds that search for the appropriate fields (probably the most correct option)
  • Partition across my_filter_field and have both use the same detectors
  • Figure out a more complicated aggregation that utilizes a multi_bucket aggregation

What I could figure out as a more complicated option using a multi-bucket aggs + pipeline aggs is the following.

Basically, I am using filters with only one filter (thus only one bucket) so that we can have a max bucket pipeline aggregation (which only supports multi-bucket) and then the pipeline aggregation name can simply be the detector field name (metric1_value and matric2_value in this specific case):

{
  "job_id": "my_job_id",
  "indices": ["my_index"],
  "aggs": {
    "buckets": {
      "date_histogram": {
        "field": "ts",
        "interval": "1h",
        "time_zone": "UTC"
      },
      "aggs": {
        "ts": {"max": {"field": "ts"}},
        "metricBucketsA":{
          "filters":{"filters":[ {"match" : { "my_filter_field" : "A"   }}],
          "aggs":{
            "metric":{"value_count":{"field":"my_field"}}
          }
        },
        "metricBucketsB":{
          "filters":{"filters":[ {"match" : { "my_filter_field" : "B"   }}],
          "aggs":{
            "metric":{"sum":{"field":"my_other_field"}}
          }
        },
        "metric1_value": {
          "max_bucket": {
             "buckets_path": "metricBucketsA.metric"
           }
        },
       "metric2_value": {
          "max_bucket": {
             "buckets_path": "metricBucketsB.metric"
           }
        }
      }
    }
  }
}

All that said, we do support partitioning over a field, and then having numerous different detectors for those partitions.

BenTrent -

Thanks for the addendum. I suspected I had run into the a limitation in datafeeds I had seen some documentation saying it only had support for limited aggregations.

My actual use case was an ML job running on the "failure rate" of certain user-actions on our website. We record a "status" field which has the value "F" if the action failed and several other values if it didn't. So I wanted to do separate counts of fails, of total actions, and then compute the ratio in a bucket script to get the failure rate and run the ML job on that. Without the ability to do separate filters though I wasn't able to compute the count of the filtered field and the unfiltered one separately.

Because I wanted to work with an operation on two metric values, they had to be in the same job.

Because I only cared about one particular status and none of the others partitioning over the status field didn't make sense.

However, your third solution looks like it might get me there, and if it doesn't, thanks for clearing up what's possible and what's not at least I won't have to keep banging my head on it.

edit:

Confirming your solution did work on my data:

  {
    "ts": 1544425187000,
    "metric1_value": 235,
    "doc_count": 3509
  },
1 Like

Well, it almost worked.

This query works:

GET member_survey_response-*/_search
{
  "size":0,
  "aggs": {
    "ts": {"max": {"field": "ts"}},
    "metric1":{
      "filters":{
          "filters":[ {
              "match" : { "my_filter_field" : "A"   }
          }]
      },
      "aggs":{
        "metric1_value":{"value_count":{"field":"my_field"}}
      }
    },
    "metric2":{
      "filters":{
          "filters":[ {
              "wildcard" : { "my_filter_field" : "*B*"   }
          }]
      },
      "aggs":{
        "metric2_value":{"value_count":{"field":"my_other_field"}}
      }
    },
    "metric1_final": {
      "max_bucket": {
         "buckets_path": "metric1.metric1_value"
       }
    },
    "metric2_final": {
      "max_bucket": {
         "buckets_path": "metric2.metric2_value"
       }
    }
  }
} 

However when used in the datafeed, the preview returns the following error:

{
  "error": {
    "root_cause": [
      {
        "type": "illegal_argument_exception",
        "reason": "Multiple bucket aggregations at the same level are not supported"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "Multiple bucket aggregations at the same level are not supported"
  },
  "status": 400
}

Seems like the datafeed doesn't like doing multiple aggregations like this.

@elasticitous Sorry about that :(,

Seeing your use case of "failure rate" it seems to me that you want to know the percentage of items per date_histogram bucket that are failed, correct? If so, this should be more straight forward than how I was making it :).

    {
    "aggregations" : {
            "buckets" : {
              "date_histogram" : {
                "field" : "ts",
                "time_zone" : "UTC",
                "interval" : "1h"
              },
              "aggregations" : {
                "ts" : {
                  "max" : {
                    "field" : "ts"
                  }
                },
                "failed_count" : {
                  "filter": {
                        "term": {
                          "my_field": "FAILED_STATUS"
                        }
                      }
                },
                "percent_failed": {
                  "bucket_script": {
                    "buckets_path": {
                      "total_count": "_count", //This is the total count in the histogram bucket
                      "failure_count": "failed_count._count"
                    },
                    "script": "params.failureCount / params.totalCount"
                  }
                }
              }
            }
          }
    }

Then run your job against the percent_failed field.

Now, if you have a bunch of different actions you want to handle separately, you will want to partition the job on that action_type field and create a Multi-Metric job. This will dynamically split the the job so that you now have anomalous checks for failed_percentage for each individual action type. If you only care about specific action types, you can add a query to the datafeed specifying such.

BenTrent - really appreciate the responses. Just to get explicit about the use case:

The use case is a failure rate, but it's not the simple case you describe. We're getting data from multiple sources, and putting it all into the same index, but it's not uniform data, so the "Total" so to speak isn't the raw document count it has to be defined source-by-source based on a wildcard search on another field.

So in plain language:

FinalMetric = (Count where field1 = value1) / (Count where field2 is like "Source1*")

And field2 is data like "Source1-aoiejfoeijfiowejfio", "Source1-ogmoijgioajoifei" etc etc. The plan was to split the job over these sources or just run one-per-source if I couldn't get that working.

Sources can share the "action_type" but the total has to be by source. I was avoiding just using a query clause to filter the entire datafeed by field2, but if I have access to the raw document count in that filter aggregation then maybe I can just use a query clause in the datafeed to filter by source, and then count the action_type terms as you described.

I'll give it a try tomorrow.

If I do it like this, am I pipe-lining the first filter into the second, and only getting the subset that match both filters?

{
"aggregations" : {
        "buckets" : {
          "date_histogram" : {
            "field" : "ts",
            "time_zone" : "UTC",
            "interval" : "1h"
          },
          "aggregations" : {
            "ts" : {
              "max" : {
                "field" : "ts"
              }
            },
            "failed_count" : {
              "filter": {
                    "term": {
                      "my_field": "FAILED_STATUS"
                    }
                  }
            },
            "total_count" : {
              "filter": {
                    "term": {
                      "my_other_field": "TOTAL_STATUS"
                    }
                  }
            },
            "percent_failed": {
              "bucket_script": {
                "buckets_path": {
                  "total_count": "total_count._count",
                  "failure_count": "failed_count._count"
                },
                "script": "params.failureCount / params.totalCount"
              }
            }
          }
        }
      }
}

It's also possible I can use the "agg._count" syntax and get the original code working, because the value and the doc count were always the same anyway when it ran but I couldn't access the sub-agg value which was the original problem.

Thanks again for all the help.

1 Like

I raised an issue to support handling single bucket aggregations. I think this is a great example of a datafeed use case that we should handle

Oh wow, greatly appreciated.

The general use case for me is an index with data from various business segments, who each have KPIs defined slightly differently, with the intent to run business-segment specific jobs, necessitating defining counts, sums, etc slightly differently based on filters unique to the business segment.

This would make it much simpler to do what seems conceptually very simple and is at least for queries.

And thanks again BenTrent for all the workarounds.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.