Aggregate watchers over multiple fields for term aggregation

I have a scenario where i want to aggregate my result with the combination of 2 fields value.
Following is the json of index on which my watcher targets

{
  "_index": ".monitoring-es-6-2018.08.08",
  "_type": "doc",
  "_id": "AWUYhYcMSsCneLp5ymFB",
  "_version": 1,
  "_score": null,
  "_source": {
    "cluster_uuid": "If7i-iVaQsWQvxjV9J1Atg",
    "timestamp": "2018-08-08T07:52:06.733Z",
    "type": "node_stats",
    "source_node": {
      "uuid": "iu7bnPFDTWeqXAjdfTIrkg",
      "host": "data-1-node.elastic.autoip.dcos.thisdcos.directory",
      "transport_address": "16.234.226.205:1026",
      "ip": "16.234.226.205",
      "name": "data-1-node",
      "attributes": {
        "ml.max_open_jobs": "10",
        "ml.enabled": "true"
      }
    }
}

Following is my watcher body

{
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "input": {
    "search": {
      "request": {
        "indices": [
          ".monitoring-es-6-*"
        ],
        "body": {
          "size" : 0,
          "query": {
            "bool": {
              "filter": {
                "range": {
                  "timestamp": {
                    "gte": "now-2m",
                    "lte": "now"
                  }
                }
              }
            }
          },
          "aggs": {
            "minutes": {
              "date_histogram": {
                "field": "timestamp",
                "interval": "1m"
              },
              "aggs": {
                "nodes": {
                  "terms": {
                    "field": "source_node.ip",
                    "order": {
                      "free_disk": "desc"
                    }
                  },
                  "aggs": {
                    "free_disk": {
                      "avg": {
                        "field": "node_stats.fs.total.free_in_bytes"
                      }
                    },
                    "total_disk": {
                      "avg": {
                        "field": "node_stats.fs.total.total_in_bytes"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "script":  "if (ctx.payload.aggregations.minutes.buckets.size() == 0) return false; def latest = ctx.payload.aggregations.minutes.buckets.get(ctx.payload.aggregations.minutes.buckets.size() - 1); def node = latest.nodes.buckets[0]; def status = (node != null && node.free_disk != null && node.free_disk.value / 1000000000L <= 7000); return status;"
  },
  "actions": {
    "send_email": { 
      "transform": {
        "script": "def latest = ctx.payload.aggregations.minutes.buckets.get(ctx.payload.aggregations.minutes.buckets.size() - 1); def result = latest.nodes.buckets.stream().filter(it -> it != null && it.free_disk != null && it.free_disk.value > 0).filter(it -> (float) it.free_disk.value / it.total_disk.value * 100 < 100).collect(Collectors.toList());  return result;"
      },
      "email": {
        "to": "sahil.sawhney@knoldus.in", 
        "subject": "Watcher Notification - LOW FREE HARD DISK ON SATURN ES",
        "body": "Nodes with LOW FREE HARD DISK (BELOW 100):\n\n{{#ctx.payload._value}}\"{{key}}\" - FREE HARD DISK is {{free_disk.value}}%\n{{/ctx.payload._value}}"
      }
    }
  }
}

Here i want am doing the aggregation over the field 'source_node.ip' but i want the key formed to be something like 'source_node.ip + source_node.name'

I had tried something like

"aggs": {
                "nodes": {
                  "terms": {
                    "script": "docs['source_node.ip'] + docs['source_node.name']",
                    "order": {
                      "free_disk": "desc"
                    }
                  },
                  .
                  . // sub aggregations
                  .
                 }
              }

But this results in error that no field source_node found
How can i achieve the above mentioned scenario ?

use doc instead of docs and make sure you are accessing a field that is not analyzed. See this example

DELETE foo

PUT foo/doc/1?refresh=
{
  "source_node": {
    "ip": "8.8.8.8",
    "name": "node_name"
  }
}

GET foo/doc/_search
{
  "size": 0,
  "aggs": {
    "scripted": {
      "terms": {
        "script": {
          "source" : "doc['source_node.name.keyword'].value + '_-_' + doc['source_node.ip.keyword'].value"
        }, 
        "size": 10
      }
    }
  }
}

hope this helps

@spinscale
i did tried the solution you suggested but it fails with following message
failed to execute watch input
And the reason for failure is
SearchPhaseExecutionException[all shards failed]; nested: ScriptException[compile error]; nested: IllegalArgumentException[Variable [source_node] is not defined.];
The error is same as before

Any pointers on this @spinscale ?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.