Including Metricbeat Custom Field in Watch

Hello -

I have the following Watch which triggers when Metricbeat shows system.cpu.total.pct is over 75%.

PUT _watcher/watch/testwatch
{
  "trigger": {
    "schedule": {
      "interval": "10s"
    }
  },
  "metadata": {
    "threshold": 75
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "metricbeat-7.9.1*"
        ],
        "rest_total_hits_as_int": true,
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "filter": [
                {
                  "range": {
                    "@timestamp": {
                      "gte": "now-5m",
                      "lt": "now"
                    }
                  }
                },
                {
                  "exists": {
                    "field": "system.cpu.total.pct"
                  }
                }
              ]
            }
          },
          "aggs": {
            "Hosts": {
              "terms": {
                "field": "agent.hostname",
                "size": 10
              },
              "aggs": {
                "CPU": {
                  "avg": {
                    "script": {
                      "source": "(doc['system.cpu.total.pct'].value*100)"
                    }
                  }
                },
                "cpu_bucket_filter": {
                  "bucket_selector": {
                    "buckets_path": {
                      "cpu_avg": "CPU"
                    },
                    "script": "params.cpu_avg > {{ctx.metadata.threshold}}"
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "script": {
      "source": "!ctx.payload.aggregations.Hosts.buckets.isEmpty()",
      "lang": "painless"
    }
  },
  "actions": {
    "log": {
      "logging": {
        "level": "info",
        "text": "The following host(s) have high CPU values over {{ctx.metadata.threshold}}% for the past 5 minutes:{{#ctx.payload.hosts}} {{device}}:{{cpu}}% {{/ctx.payload.hosts}}"
      }
    }
  },
  "transform": {
    "script": {
      "source": """
DecimalFormat df = new DecimalFormat("0.##");
def hosts = [];
            for (bucket in ctx.payload.aggregations.Hosts.buckets)
            {
                hosts.add([
                'device': bucket.key,
                'cpu': df.format(bucket.CPU.value)
                ]);
              }
            return ['hosts': hosts];
            """,
      "lang": "painless"
    }
  }
}

I would like to modify this Watch to include a Custom Field that I have in my Metricbeat configuration. My Custom Fields in Metricbeat are:

fields_under_root: true
fields:
   product: someProduct
   role: someRole
   env: prd

Could someone please walk me through the process of how I would add "product" to my logging action?

Thanks,
Butch

Hey,

so you could use the top_hits aggregation to return a single document per agent.hostname terms bucket, and then extract that field in your action and reuse it there.

Hope that helps as a start!

Thanks Alexander,

This isn't clicking for me yet. I tried the following but it's throwing an error:

       "aggs": {
          "top-Products": {
              "terms": {
                     "field": "product",
                     "size": 10
               },
          "aggs": {
            "Hosts": {
              "terms": {
                "field": "agent.hostname",
                "size": 10
              },
              "aggs": {
                "CPU": {
                  "avg": {
                    "script": {
                      "source": "(doc['system.cpu.total.pct'].value*100)"
                    }
                  }
                },
                "cpu_bucket_filter": {
                  "bucket_selector": {
                    "buckets_path": {
                      "cpu_avg": "CPU"
                    },
                    "script": "params.cpu_avg > {{ctx.metadata.threshold}}"
                  }
                }
              }
              }
            }
            }
          }
        }
      }
    }
  },

Unfortunately I'm working on an air-gap network so I can't send out the full watcher results but the error starts with:

"exception" : {
    "type": "script_exception",
    "reason": "runtime error",
    "script_stack": [
       "!ctx.payload.aggregations.Hosts.buckets.isEmpty()",
       "                               ^---- HERE"

Any additional details you could provide would be greatly appreciated.

Thanks,
Butch

The sample you provided is not using the top_hits aggregation, you basically changed the structure of the whole response. The idea is to put the top_hits aggregation under the Hosts agg, so retrieve a single hit for each agent hostname.

I made the following change but obviously I still have a syntax error.

I have:

          "aggs": {
            "Hosts": {
              "terms": {
                "field": "agent.hostname",
                "size": 10
              },
              "top-hosts": {
                  "top_hits": {}
              },
              "aggs": {
                "CPU": {
                  "avg": {
                    "script": {
                      "source": "(doc['system.cpu.total.pct'].value*100)"
                    }
                  }
                },
                "cpu_bucket_filter": {
                  "bucket_selector": {
                    "buckets_path": {
                      "cpu_avg": "CPU"
                    },
                    "script": "params.cpu_avg > {{ctx.metadata.threshold}}"
                  }
                }
              }
            }
          }
        }
      }
    }
  },

Now I get the following error:

"type": "parsing_exception",
"reason": "Found two aggregation type definitions in [Hosts]: [terms] and [top-hosts]"

Thanks,
Butch

I think I'm getting closer here but now I have a "Duplicate key" error.

          "aggs": {
            "Hosts": {
              "terms": {
                "field": "agent.hostname",
                "size": 10
              },
			  "aggs": {
				"Product": {
					"top_hits": {
						"_source": "product",
						"size": 1
					}
				}
			   },
              "aggs": {
                "CPU": {
                  "avg": {
                    "script": {
                      "source": "(doc['system.cpu.total.pct'].value*100)"
                    }
                  }
                },
                "cpu_bucket_filter": {
                  "bucket_selector": {
                    "buckets_path": {
                      "cpu_avg": "CPU"
                    },
                    "script": "params.cpu_avg > {{ctx.metadata.threshold}}"
                  }
                }
              }
            }
          }

The specific error is:

"type": "json_parse_exception",
"reason": "Duplicate field 'aggs'\n at [Source: (org.elasticsearch.common.bytes.AbstractBytesReference$MarkSupportingStreamInputWrapper); line: 35, column: 13]"

In the Console the error is pointing to the CPU aggs.

Thanks for your help,
Butch

try

"Hosts" : {
  "terms" : { ...},
   "aggs" : {
      "Product" : { ... },
      "CPU" : { ... }
   }
}

a single aggs field is required, otherwise the JSON is considered invalid

Thanks Alexander,

This appears to be working!

For the benefit of others, here's the full Watcher which pulls a Custom Field from Metricbeat and uses it in the action.

PUT _watcher/watch/testwatch
{
  "trigger": {
    "schedule": {
      "interval": "10s"
    }
  },
  "metadata": {
    "threshold": 75
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "metricbeat-7.9.1*"
        ],
        "rest_total_hits_as_int": true,
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "filter": [
                {
                  "range": {
                    "@timestamp": {
                      "gte": "now-5m",
                      "lt": "now"
                    }
                  }
                },
                {
                  "exists": {
                    "field": "system.cpu.total.pct"
                  }
                }
              ]
            }
          },
          "aggs": {
            "Hosts": {
              "terms": {
                "field": "agent.hostname",
                "size": 100
              },
              "aggs": {
                  "Product": {
                      "top_hist": {
                          "_source": "product",
                          "size": 1
                      }
                },
                  "CPU": {
                    "avg": {
                      "script": {
                        "source": "(doc['system.cpu.total.pct'].value*100)"
                      }
                    }
                  },
                  "cpu_bucket_filter": {
                    "bucket_selector": {
                      "buckets_path": {
                        "cpu_avg": "CPU"
                      },
                    "script": "params.cpu_avg > {{ctx.metadata.threshold}}"
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "script": {
      "source": "!ctx.payload.aggregations.Hosts.buckets.isEmpty()",
      "lang": "painless"
    }
  },
  "actions": {
    "log": {
      "logging": {
        "level": "info",
        "text": "The following host(s) have high CPU values over {{ctx.metadata.threshold}}% for the past 5 minutes:{{#ctx.payload.hosts}} {{device}}:{{product}}:{{cpu}}% {{/ctx.payload.hosts}}"
      }
    }
  },
  "transform": {
    "script": {
      "source": """
DecimalFormat df = new DecimalFormat("0.##");
def hosts = [];
            for (bucket in ctx.payload.aggregations.Hosts.buckets)
            {
                hosts.add([
                'device': bucket.key,
                'product': bucket.Product.hits.hits.0._source.product,
                'cpu': df.format(bucket.CPU.value)
                ]);
              }
            return ['hosts': hosts];
            """,
      "lang": "painless"
    }
  }
}

Thanks again for your help,
Butch

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.