Watcher condition with aggregation

Hello, I'm trying to create a watcher alert when any rabbitmq queue exceeds X amount. My query returns the results I'm looking for, but the conditional usually throws a null pointer exception. Any help is appreciated!

  {
  "trigger": {
    "schedule": {
      "interval": "5m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "metricbeat-*"
        ],
        "rest_total_hits_as_int": true,
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "filter": [
                {
                  "range": {
                    "@timestamp": {
                      "gte": "now-3h"
                    }
                  }
                }
              ]
            }
          },
          "aggs": {
            "node_name": {
              "terms": {
                "field": "rabbitmq.node.name.keyword",
                "size": "2"
              },
              "aggs": {
                "queue_name": {
                  "terms": {
                    "field": "rabbitmq.queue.name.keyword",
                    "size": "100"
                  },
                  "aggs": {
                    "get_latest": {
                      "terms": {
                        "field": "@timestamp",
                        "size": 1,
                        "order": {
                          "_key": "desc"
                        }
                      },
                      "aggs": {
                        "unack_count": {
                          "terms": {
                            "field": "rabbitmq.queue.messages.unacknowledged.count"
                           }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "script": {
        "source": "ArrayList arr = ctx.payload.aggregations.node_name.buckets; for (int i = 0; i < arr.length; i++) { if (arr[i]['unack_count'].value > 1)  { return true; }} return false;",
      "lang": "painless"
    }
  },
  "actions": {[...]

Hi there, welcome to the forum! I have to assume that this error is being thrown because unack_count is null on one of the buckets. Could you share the error you're seeing?

Here's one way you can check for null before trying to read the value, which will skip over the buckets that would otherwise throw the null pointer exception.

ArrayList arr = ctx.payload.aggregations.node_name.buckets;
for (int i = 0; i < arr.length; i++) {
  if (arr[i].contains('unack_count') && arr[i]['unack_count'].value > 1)  {
    return true;
  }
}
return false;

Thanks for the warm welcome @cjcenizal! Of course, here's the error I receive:

 "exception": {
    "type": "script_exception",
    "reason": "runtime error",
    "script_stack": [
      "if (arr[i]['unack_count'].value > 1) { ",
      "                         ^---- HERE"
    ],
    "script": "ArrayList arr = ctx.payload.aggregations.node_name.buckets; for (int i = 0; i < arr.length; i++) { if (arr[i]['unack_count'].value > 1) { if (arr[i]['unack_count'] != null) { return true; }}} return false;",
    "lang": "painless",
    "caused_by": {
      "type": "null_pointer_exception",
      "reason": null,
      "stack_trace": "java.lang.NullPointerException\n\tat org.elasticsearch.painless.DefBootstrap$PIC.fallback(DefBootstrap.java:206)\n\tat org.elasticsearch.painless.PainlessScript$Script.execute(ArrayList arr = ctx.payload.aggregations.node_name.buckets; for (int i = 0; i < arr.length; i++) { if (arr[i]['unack_count'].value > 1) { if (arr[i]['unack_count'] != null) { return true; }}} return false;:125)\n\tat org.elasticsearch.xpack.watcher.condition.ScriptCondition.doExecute(ScriptCondition.java:60)\n\tat org.elasticsearch.xpack.watcher.condition.ScriptCondition.execute(ScriptCondition.java:55)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService.executeInner(ExecutionService.java:513)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService.execute(ExecutionService.java:320)\n\tat org.elasticsearch.xpack.watcher.transport.actions.execute.TransportExecuteWatchAction$1.doRun(TransportExecuteWatchAction.java:159)\n\tat org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService$WatchExecutionTask.run(ExecutionService.java:627)\n\tat org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:830)\n"
    },
    "stack_trace": "ScriptException[runtime error]; nested: NullPointerException;\n\tat org.elasticsearch.painless.PainlessScript.convertToScriptException(PainlessScript.java:94)\n\tat org.elasticsearch.painless.PainlessScript$Script.execute(ArrayList arr = ctx.payload.aggregations.node_name.buckets; for (int i = 0; i < arr.length; i++) { if (arr[i]['unack_count'].value > 1) { if (arr[i]['unack_count'] != null) { return true; }}} return false;:93)\n\tat org.elasticsearch.xpack.watcher.condition.ScriptCondition.doExecute(ScriptCondition.java:60)\n\tat org.elasticsearch.xpack.watcher.condition.ScriptCondition.execute(ScriptCondition.java:55)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService.executeInner(ExecutionService.java:513)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService.execute(ExecutionService.java:320)\n\tat org.elasticsearch.xpack.watcher.transport.actions.execute.TransportExecuteWatchAction$1.doRun(TransportExecuteWatchAction.java:159)\n\tat org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService$WatchExecutionTask.run(ExecutionService.java:627)\n\tat org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:830)\nCaused by: java.lang.NullPointerException\n\tat org.elasticsearch.painless.DefBootstrap$PIC.fallback(DefBootstrap.java:206)\n\tat org.elasticsearch.painless.PainlessScript$Script.execute(ArrayList arr = ctx.payload.aggregations.node_name.buckets; for (int i = 0; i < arr.length; i++) { if (arr[i]['unack_count'].value > 1) { if (arr[i]['unack_count'] != null) { return true; }}} return false;:125)\n\t... 11 more\n"
  }
}

When I plug in the change you suggested I receive this illegal argument exception:

 "exception": {
    "type": "script_exception",
    "reason": "runtime error",
    "script_stack": [
      "if (arr[i].contains('unack_count') && arr[i]['unack_count'].value > 1)  { ",
      "          ^---- HERE"
    ],
    "script": "ArrayList arr = ctx.payload.aggregations.node_name.buckets; for (int i = 0; i < arr.length; i++) { if (arr[i].contains('unack_count') && arr[i]['unack_count'].value > 1)  { return true; }}return false;",
    "lang": "painless",
    "caused_by": {
      "type": "illegal_argument_exception",
      "reason": "dynamic method [java.util.HashMap, contains/1] not found",
      "stack_trace": "java.lang.IllegalArgumentException: dynamic method [java.util.HashMap, contains/1] not found\n\tat org.elasticsearch.painless.Def.lookupMethod(Def.java:205)\n\tat org.elasticsearch.painless.DefBootstrap$PIC.lookup(DefBootstrap.java:151)\n\tat org.elasticsearch.painless.DefBootstrap$PIC.fallback(DefBootstrap.java:207)\n\tat org.elasticsearch.painless.PainlessScript$Script.execute(ArrayList arr = ctx.payload.aggregations.node_name.buckets; for (int i = 0; i < arr.length; i++) { if (arr[i].contains('unack_count') && arr[i]['unack_count'].value > 1)  { return true; }}return false;:110)\n\tat org.elasticsearch.xpack.watcher.condition.ScriptCondition.doExecute(ScriptCondition.java:60)\n\tat org.elasticsearch.xpack.watcher.condition.ScriptCondition.execute(ScriptCondition.java:55)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService.executeInner(ExecutionService.java:513)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService.execute(ExecutionService.java:320)\n\tat org.elasticsearch.xpack.watcher.transport.actions.execute.TransportExecuteWatchAction$1.doRun(TransportExecuteWatchAction.java:159)\n\tat org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService$WatchExecutionTask.run(ExecutionService.java:627)\n\tat org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:830)\n"
    },
    "stack_trace": "ScriptException[runtime error]; nested: IllegalArgumentException[dynamic method [java.util.HashMap, contains/1] not found];\n\tat org.elasticsearch.painless.PainlessScript.convertToScriptException(PainlessScript.java:94)\n\tat org.elasticsearch.painless.PainlessScript$Script.execute(ArrayList arr = ctx.payload.aggregations.node_name.buckets; for (int i = 0; i < arr.length; i++) { if (arr[i].contains('unack_count') && arr[i]['unack_count'].value > 1)  { return true; }}return false;:93)\n\tat org.elasticsearch.xpack.watcher.condition.ScriptCondition.doExecute(ScriptCondition.java:60)\n\tat org.elasticsearch.xpack.watcher.condition.ScriptCondition.execute(ScriptCondition.java:55)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService.executeInner(ExecutionService.java:513)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService.execute(ExecutionService.java:320)\n\tat org.elasticsearch.xpack.watcher.transport.actions.execute.TransportExecuteWatchAction$1.doRun(TransportExecuteWatchAction.java:159)\n\tat org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)\n\tat org.elasticsearch.xpack.watcher.execution.ExecutionService$WatchExecutionTask.run(ExecutionService.java:627)\n\tat org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:830)\nCaused by: java.lang.IllegalArgumentException: dynamic method [java.util.HashMap, contains/1] not found\n\tat org.elasticsearch.painless.Def.lookupMethod(Def.java:205)\n\tat org.elasticsearch.painless.DefBootstrap$PIC.lookup(DefBootstrap.java:151)\n\tat org.elasticsearch.painless.DefBootstrap$PIC.fallback(DefBootstrap.java:207)\n\tat org.elasticsearch.painless.PainlessScript$Script.execute(ArrayList arr = ctx.payload.aggregations.node_name.buckets; for (int i = 0; i < arr.length; i++) { if (arr[i].contains('unack_count') && arr[i]['unack_count'].value > 1)  { return true; }}return false;:110)\n\t... 11 more\n"
  }
}

Thanks again!

I updated my query to only look for docs that include the rabbitmq.queue.messages.unacknowledged.count field. Still no success. :confused:

 "body": {
          "size": 0,
          "query": {
            "bool": {
              "filter": [
                {
                  "bool": {
                    "minimum_should_match": 1,
                    "should": [
                      {
                        "match_phrase": {
                          "event.dataset": "rabbitmq.queue"
                        }
                      }
                    ]
                  }
                },
                {
                  "exists": {
                    "field": "rabbitmq.queue.messages.unacknowledged.count"
                  }
                },

Hi @cjcenizal. Could you provide any feedback on my results? :slight_smile:
Also, would array compare be applicable here?

"condition": {
    "array_compare": {
      "ctx.payload.aggregations.node_name.buckets" : { 
        "path": "unsure which path leads to unack_count.value", 
        "gte": { 
          "value": 0 
        }
      }
    }
  }

Hello @18visions

I have few comments on the Watcher.

  1. From the field names it seems you're not using the index template which comes with Metricbeat. E.g. in 7.5 or 7.6, the field names should be rabbitmq.node.name and it's indexed directly as keyword.

  2. I do not have any rabbitmq data right now, but to get the latest event you should probably use a top_hits aggregation. This way you can get directly the latest value of rabbitmq.queue.messages.unacknowledged.count

  3. arr[i]['unack_count'].value triggers a null pointer because the sub key value doesn't exist.
    You have those nested aggs node_name > queue_name > get_latest > unack_count.
    A typical response will be:

    "aggregations" : {
     "node_name" : {
       "doc_count_error_upper_bound" : 0,
       "sum_other_doc_count" : 3207075,
       "buckets" : [
         {
           "key" : "...",
           "doc_count" : 5522102,
           "queue_name" : {
             "doc_count_error_upper_bound" : 0,
             "sum_other_doc_count" : 0,
             "buckets" : [
               {
                 "key" : "...",
                 "doc_count" : 1704400,
                 "get_latest" : {
                   "doc_count_error_upper_bound" : 0,
                   "sum_other_doc_count" : 1704300,
                   "buckets" : [
                     {
                       "key" : 1588721268561,
                       "key_as_string" : "2020-05-05T23:27:48.561Z",
                       "doc_count" : 100,
                       "unack_count" : {
                         "doc_count_error_upper_bound" : 0,
                         "sum_other_doc_count" : 0,
                         "buckets" : [
                           {
                             "key" : ...,
                             "doc_count" : 100
                           }
                         ]
                       }
                     }
                   ]
                 }
               },
    
  4. Another optimization would be to add a range filter on the query on the field rabbitmq.queue.messages.unacknowledged.count to be gt 0.


Using the top_hits:

          "aggs": {
            "node_name": {
              "terms": {
                "field": "rabbitmq.node.name.keyword",
                "size": "2"
              },
              "aggs": {
                "queue_name": {
                  "terms": {
                    "field": "rabbitmq.queue.name.keyword",
                    "size": "100"
                  },
                  "aggs": {
                    "get_latest": {
                      "top_hits": {
                        "size": 1,
                        "sort": [ {"@timestamp": "desc"}],
                        "docvalue_fields": [ "rabbitmq.queue.messages.unacknowledged.count" ],
                        "_source": "_none_"
                      }
                    }
                  }
                }
              }
            }
          }
        }

The response would be:

{
  "took" : 1227,
  "timed_out" : false,
  "_shards" : {
    "total" : 9,
    "successful" : 9,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "node_name" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 3218685,
      "buckets" : [
        {
          "key" : "...",
          "doc_count" : 5540663,
          "queue_name" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : "...",
                "doc_count" : 1710100,
                "get_latest" : {
                  "hits" : {
                    "total" : {
                      "value" : 1710100,
                      "relation" : "eq"
                    },
                    "max_score" : null,
                    "hits" : [
                      {
                        "_index" : "metricbeat-7.5.1-2020.05.05-000114",
                        "_type" : "_doc",
                        "_id" : "HFQ053EBeRTJR4sy-HNV",
                        "_score" : null,
                        "_source" : { },
                        "fields" : {
                          "rabbitmq.queue.messages.unacknowledged.count" : [
                            ...
                          ]
                        },
                        "sort" : [
                          1588721938560
                        ]
                      }
                    ]
                  }
                }
              }, ...

The condition to handle this (and avoid any null pointer exception in case some of the aggregation have no data) should be:

    "condition": {
      "script": {
        "source": """
if (ctx.payload.aggregations?.node_name != null) {
  for(int i = 0; i < ctx.payload.aggregations.node_name.buckets.length; i++) {
    def nn = ctx.payload.aggregations.node_name.buckets[i];
    if (nn?.queue_name != null) {
      for(int j = 0; j < nn.queue_name.buckets.length; j++) {
        def qn = nn.queue_name.buckets[j];
        if (qn?.get_latest != null && qn.get_latest.hits.hits.length > 0) {
          def latest = qn.get_latest.hits.hits[0];
          if (latest.fields != null && latest.fields['rabbitmq.queue.messages.unacknowledged.count'][0] > 0)
            return true;
        }
      }
    }
  }
}
return false;
        """

You're a wizard :slight_smile: Thank you very much for your comments and suggestion- worked like a charm.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.