Watcher nested aggregations with "direction"

Hi,
need a bit of help here with the watcher aggregation. So far I've dealt with somewhat simplified aggs of the form: time->host->latency.
Now I need to work on a bit of a different struct: time->host->direction(in/out)->count.
Here's the Execute API on just ONE host:

{
  "took": 120,
  "timed_out": false,
  "_shards": {
    "total": 90,
    "successful": 90,
    "failed": 0
  },
  "hits": {
    "total": 12388488,
    "max_score": 3.4217281,
    "hits": [
      {
        "_index": "metrics-logstash-events-2018.05.15",
        "_type": "logs",
        "_id": "AWNhsEzjxT0d2iHVWExL",
        "_score": 3.4217281,
        "_source": {
          "hostname": "idb-syslog-to-elk01",
          "@timestamp": "2018-05-15T02:45:34.048Z",
          "role": "idb-syslog-to-elk",
          "@version": "1",
          "message": "390d1450067e",
          "env": "dev",
          "events": {
            "rate_1m": 2369.6364088497176,
            "rate_15m": 1313.484153030162,
            "count": 12334829429,
            "rate_5m": 1325.9731311137257
          },
          "direction": "in"
        }
      },
      {
        "_index": "metrics-logstash-events-2018.05.15",
        "_type": "logs",
        "_id": "AWNhsEzjxT0d2iHVWExM",
        "_score": 3.4217281,
        "_source": {
          "hostname": "idb-syslog-to-elk01",
          "@timestamp": "2018-05-15T02:45:34.049Z",
          "role": "idb-syslog-to-elk",
          "latency": {
            "min": 0,
            "rate_1m": 2364.295865870752,
            "rate_15m": 1296.0409708266031,
            "max": 1373715744447,
            "p5": 45809,
            "mean": 1773511221.7431247,
            "count": 12015839728,
            "rate_5m": 1316.2510911377335,
            "stddev": 221730.6164462496,
            "p95": 45809
          },
          "@version": "1",
          "message": "390d1450067e",
          "env": "dev",
          "events": {
            "rate_1m": 2369.6299397889406,
            "rate_15m": 1313.4818294550885,
            "count": 12334829429,
            "rate_5m": 1325.9668145518926
          },
          "direction": "out"
        }
      },
      {
        "_index": "metrics-logstash-events-2018.05.15",
        "_type": "logs",
        "_id": "AWNhsDmdxT0d2iHVV-3X",
        "_score": 3.4217281,
        "_source": {
          "hostname": "idb-syslog-to-elk01",
          "@timestamp": "2018-05-15T02:45:29.115Z",
          "role": "idb-syslog-to-elk",
          "@version": "1",
          "message": "390d1450067e",
          "env": "dev",
          "events": {
            "rate_1m": 2157.43726480526,
            "rate_15m": 1293.996092725291,
            "count": 12334805105,
            "rate_5m": 1267.3925343519945
          },
          "direction": "in"
        }
      },
      {
        "_index": "metrics-logstash-events-2018.05.15",
        "_type": "logs",
        "_id": "AWNhsDmdxT0d2iHVV-3Y",
        "_score": 3.4217281,
        "_source": {
          "hostname": "idb-syslog-to-elk01",
          "@timestamp": "2018-05-15T02:45:29.115Z",
          "role": "idb-syslog-to-elk",
          "latency": {
            "min": 0,
            "rate_1m": 2153.249022335875,
            "rate_15m": 1276.5615835790086,
            "max": 1373715744447,
            "p5": 45809,
            "mean": 1773514788.8849082,
            "count": 12015815557,
            "rate_5m": 1257.8264228374765,
            "stddev": 221730.7278094322,
            "p95": 45809
          },
          "@version": "1",
          "message": "390d1450067e",
          "env": "dev",
          "events": {
            "rate_1m": 2157.4128527470007,
            "rate_15m": 1293.9948704087974,
            "count": 12334805105,
            "rate_5m": 1267.3894728980138
          },
          "direction": "out"
        }
      }
    ]
  }
}

for each host I need to do the following mnemonically as a condition:
(metricHostOUT/metricHostIN)*100 > 20
where Host is hostname
where my "metric" is events.count
whereIN/OUT is the value of the direction

I came up with the following aggregation layout, but I'm note sure if it's correct and/or how to do the condition part of watcher:

          "aggregations":{
             "minutes":{
               "date_histogram":{
                  "field": "@timestamp",
                  "interval": "minute",
                  "offset": 0,
                  "order":{
                     "_key": "asc"
                  },
                  "keyed": false,
                  "min_doc_count": 0
               },
               "aggregations":{
                   "nodes":{
                      "terms":{
                        "field": "hostname.keyword",
                        "size": 10,
                        "min_doc_count": 1,
                        "shard_min_doc_count": 0,
                        "show_term_doc_count_error": false,
                        "order": [
                           {
                             "eventCnt": "desc"
                           },
                           {
                             "_term": "asc"
                           }
                        ]
                      },
                      "terms":{
                        "field": "direction.keyword",
                        "size": 10,
                        "min_doc_count": 1,
                        "shard_min_doc_count": 0,
                        "show_term_doc_count_error": false,
                        "order": [
                           {
                             "eventCnt": "desc"
                           },
                           {
                             "_term": "asc"
                           }
                        ]
                      },
                      "aggregations":{
                         "eventCnt":{
                            "sum":{
                               "field": "events.count"
                            }
                         }
                      }
                   }
               }
             }
          },

I thought of sum-img all event.count metrics per host/direction combo and then doing the math...
Any help will be greatly appreciated!

can you maybe share an aggregation output as well as the desired output you would like to have after transforming it?

--Alex

Alex,
I'm not sure how to "share an aggregation"...
I've shared the output of the Execute API and my feeble attempt at aggregation layout,
When I try piece together the query AND the aggregation code in the DevTools GUI, my second terms definition is marked with the read check mark. Something is wrong with the way I'm definition the aggs.

Ideally I'd like to get a list of hosts something like this:
hostA:23,hostB:30, hostC:25 ...
with the threshold set at 20%

the metric is calculated as mnemonic code:

hosts[hostA].events.count[direction=IN] / hosts[hostA].events.count[direction=OUT]) * 100 > threshold

can you share the full execute watch API response please? If it is to big to put here, just store it in a gist and link to it.

Thank you!

1 Like

Thanks for getting back, Alex. Here's a link to a Execute API git: ExecuteAPI

And here's another watcher I created for the query but concentrating around the latency metric.
I'm trying to take this watcher and modify it for the mnemonic code I provided previously, but am having hard time layout the aggregation definitions and then doing the condition and the transform parts and iterating over the aggs lists in the webhook.

here's the latency watcher: Latency
Your help would be greatly appreciated.
Thanks

the output you pasted above is not from the execute watch API, but from a search. In addition this search does not include any aggregation that your requirement could be checked against.

You're right, Alex - I don't have an Execute API as I'm struggling with layout the aggregations in the watch.
WRT this search does not include any aggregation... That's what I'm struggling with - layout the aggregation definitions. Given a sample search hit:

  {
    "_index": "metrics-logstash-events-2018.05.15",
    "_type": "logs",
    "_id": "AWNhn5cNE5CHCIs4VPOu",
    "_score": 1.2332723,
    "_source": {
      "hostname": "idb-syslog-to-elk01",
      "@timestamp": "2018-05-15T02:27:18.923Z",
      "role": "idb-syslog-to-elk",
      "@version": "1",
      "message": "390d1450067e",
      "env": "dev",
      "events": {
        "rate_1m": 1359.9970463953305,
        "rate_15m": 1430.3099750447518,
        "count": 12333460050,
        "rate_5m": 1437.3180885655572
      },
      "direction": "in"
    }
  },

I'm trying to aggregate:

  1. Temporally (@timestamp)

  2. By host (hostname)

  3. by metric sum (events.count)

  4. By direction (direction)

I have hard time coming up with the definition of the aggregations.
In the condition portion of the watch I want to do mnemonically (as indicated above):

hosts[hostA].events.count[direction=IN] / hosts[hostA].events.count[direction=OUT]) * 100 > threshold

I'm trying to mimic a similar watch I did previously, but have hard time introducing another aggregation level (direction) and then coming up with the condition statement.
Is that something you can nudge me slightly in the right direction?
Thanks

Alex,
I was able to come up with what a search and the corresponding aggs: here's the gist link: searchWithAggs
Also including the output with the aggs.

If you have any cycles, could you help with the condition for the algorithm outlined earlier.
Thanks

Alex,
any chance you can help me out with coding the condition portion of this watcher?
I'm a bit lost with painless and the aggregations.
Thanks
Vlad

how about a bit of pseudocode, that we can melt into proper painless syntax together, otherwise I am just hoping to get the requirement right...

Hey,
the mnemonic code from earlier in this thread:
In the condition portion of the watch I want to do mnemonically (as indicated above):

hosts[hostA].events.count[direction=IN] / hosts[hostA].events.count[direction=OUT]) * 100 > threshold

Thanks
Vlad

bump!
Any helpful hints, Alex?
Thanks

Take a look at this example

POST _xpack/watcher/watch/_execute
{
  "watch": {
    "trigger": {
      "schedule": {
        "interval": "10h"
      }
    },
    "input": {
      "simple": {
        "aggregations": {
          "minutes": {
            "buckets": [
              {
                "key": 1532113800000,
                "nodes": {
                  "buckets": [
                    {
                      "key": "idb-syslog-to-elk01",
                      "dir": {
                        "buckets": [
                          {
                            "key": "in",
                            "eventCnt": {
                              "value": 41
                            }
                          },
                          {
                            "key": "out",
                            "eventCnt": {
                              "value": 20
                            }
                          }
                        ]
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    },
    "condition" : {
      "script" : """
      for (def i = 0 ; i < ctx.payload.aggregations.minutes.buckets.size() ; i++ ) {
        def b = ctx.payload.aggregations.minutes.buckets[i];
        for (def x = 0 ; x < b.nodes.buckets.size() ; x++ ) {
          
          def b2 = b.nodes.buckets[x];

          def input = b2.dir.buckets.stream().filter(bucket -> bucket.key == 'in').findFirst().get().eventCnt.value;
          def out = b2.dir.buckets.stream().filter(bucket -> bucket.key == 'out').findFirst().get().eventCnt.value;
          def result = (input*1.0)/out > 2.0;
          if (result == true) {
            return true
          }
        }
      }
      return false;
      """
    },
    "actions": {
      "logme": {
        "logging": {
          "text": "{{ctx}}"
        }
      }
    }
  }
}

Thanks Alex - this looks promising and executes fine with ExecuteAPI in DevTools.
However... when I incorporated it into the existing watch (just the condition portion) I get json_parse exception and a run-time null-pointer exception.
I cannot figure out the parsing error: es_watcher_forwader_io_event_count parseError/Null-point exception

And here's my condition clause:

  "condition": {
    "script": """
      if (ctx.payload.aggregations.minutes.buckets.size() == 0) return false;
      for (def i = 0 ; i < ctx.payload.aggregations.minutes.buckets.size() ; i++ ) {
        def b = ctx.payload.aggregations.minutes.buckets[i];
        for (def x = 0 ; x < b.nodes.buckets.size() ; x++ ) {
          def b2 = b.nodes.buckets[x];
          def input = b2.dir.buckets.stream().filter(bucket -> bucket.key == 'in').findFirst().get().eventCnt.value;
          def out = b2.dir.buckets.stream().filter(bucket -> bucket.key == 'out').findFirst().get().eventCnt.value;
          def result = (out/input)*100 > {{ env2forwaderIO[env].ioCountDiffThr }};
          if (result == true) {
            return true
          }
        }
      }
      return false;
      """
  },

Must be something basic I'm overlooking..
Thanks

one of the great parts about painless are exact error messages

"exception": {
      "type": "script_exception",
      "reason": "runtime error",
      "script_stack": [
        "if (ctx.payload.aggregations.minutes.buckets.size() == 0) ",
        "                            ^---- HERE"
      ],
      "script": """

so we know, something seems fishy with the aggs payload. Checking out the result field of the execution watch action, you can see what was returned by your request. And it is this

"payload": {
          "_headers": {
            "content-length": [
              "539"
            ],
            "content-type": [
              "application/json; charset=UTF-8"
            ]
          },
          "error": {
            "root_cause": [
              {
                "type": "json_parse_exception",
                "reason": "Illegal unquoted character ((CTRL-CHAR, code 10)): has to be escaped using backslash to be included in name\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@72a4f9da; line: 60, column: 40]"
              }
            ],
            "type": "json_parse_exception",
            "reason": "Illegal unquoted character ((CTRL-CHAR, code 10)): has to be escaped using backslash to be included in name\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@72a4f9da; line: 60, column: 40]"
          },
          "_status_code": 500,
          "status": 500
        },

this gives you a first indication, where to look

Yes, I saw all of this. I've been looking at json and don't see anything wrong - looks like everything is properly formatted/quoted with matching/balanced curlies:

 49           "aggregations":{
 50              "minutes":{
 51                "date_histogram":{
 52                   "field": "@timestamp",
 53                   "interval": "minute",
 54                   "offset": 0,
 55                   "order":{
 56                      "_key": "asc"
 57                   },
 58                   "keyed": false,
 59                   "min_doc_count": 0
 60                },
 61                "aggregations":{
 62                    "nodes":{
 63                       "terms":{

What am I missing?

putting this in a web JSON parser shows, that one aggregations field has an unclosed double tick. Alternatively you could just copy the JSON and paste it into the kibana dev-tools as a search operation and you would also see where the JSON is broken.

Thanks - that definitely has helped. The confusing part was the line number - I was concentrating on a wrong block of code. I was able to identify other json "mishaps" as well!
Thanks a bunch!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.