How to include the ctx.payload.value in the email text for watcher alert?

Hello All,
I am trying to create a watcher alert for the servers that communicated to elastic in last 48 hrs but not in last 35 mins. I want to include the ctx.payload.value result in my email text but all I am getting is a blank value.
My watcher json file looks like:

{
    "input": {
        "search": {
            "request": {
                "search_type": "query_then_fetch",
                "indices": [
                    "metricbeat123-*"
                ],
                "rest_total_hits_as_int": true,
                "body": {
                    "query": {
                        "range": {
                            "@timestamp": {
                                "gte": "now-{{ctx.metadata.window_period}}"
                            }
                        }
                    },
                    "aggs": {
                        "periods": {
                            "filters": {
                                "filters": {
                                    "history": {
                                        "range": {
                                            "@timestamp": {
                                                "gte": "now-{{ctx.metadata.window_period}}"
                                            }
                                        }
                                    },
                                    "last_period": {
                                        "range": {
                                            "@timestamp": {
                                                "gte": "now-{{ctx.metadata.last_period}}"
                                            }
                                        }
                                    }
                                }
                            },
                            "aggs": {
                                "hosts": {
                                    "terms": {
                                        "field": "server.name",
                                        "size": 10000
                                    }
                                }
                            }
                        }
                    },
                    "size": 0
                }
            }
        }
    },
    "condition": {
        "script": {
            "source": "return ctx.payload.aggregations.periods.buckets.history.hosts.buckets.size() > ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.size();",
            "lang": "painless"
        }
    },
    "actions": {
        "log": {
            "transform": {
                "script": {
                    "source": "def last_period=ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.stream().map(e -> e.key).collect(Collectors.toList()); return ctx.payload.aggregations.periods.buckets.history.hosts.buckets.stream().map(e -> e.key).filter(p -> !last_period.contains(p)).collect(Collectors.toList());",
                    "lang": "painless"
                }
            },
            "logging": {
                "level": "info",
                "text": "Systems not responding in the last {{ctx.metadata.last_period}} minutes:{{#ctx.payload._value}}{{.}}:{{/ctx.payload._value}}"
            }
        },
        "email_administrator": {
            "email": {
                "profile": "standard",
                "attachments": {
                    "attached_data": {
                        "data": {
                            "format": "json"
                        }
                    }
                },
                "from": "abc@xyx.com",
                "to": [
                    "test123@xyz.com"
                ],
                "subject": "Encountered {{ctx.payload.hits.total}} Metricbeat Agents not reporting in last 35 mins",
                "body": {
                    "text": "{{#ctx.payload._value}}{{.}}:{{/ctx.payload._value}}"
                }
            }
        }
    },
    "metadata": {
        "last_period": "35m",
        "window_period": "48h"
    },
    "throttle_period_in_millis": 300000
}

When I do the simulate I get this result:

{
    "watch_id": "_someid_",
    "node": "abc123",
    "state": "executed",
    "user": "testuser",
    "status": {
      "state": {
        "active": true,
        "timestamp": "2021-04-09T04:11:04.692Z"
      },
      "last_checked": "2021-04-09T04:11:04.692Z",
      "last_met_condition": "2021-04-09T04:11:04.692Z",
      "actions": {
        "log": {
          "ack": {
            "timestamp": "2021-04-09T04:11:04.692Z",
            "state": "ackable"
          },
          "last_execution": {
            "timestamp": "2021-04-09T04:11:04.692Z",
            "successful": true
          },
          "last_successful_execution": {
            "timestamp": "2021-04-09T04:11:04.692Z",
            "successful": true
          }
        },
        "email_administrator": {
          "ack": {
            "timestamp": "2021-04-09T04:11:04.692Z",
            "state": "ackable"
          },
          "last_execution": {
            "timestamp": "2021-04-09T04:11:04.692Z",
            "successful": true
          },
          "last_successful_execution": {
            "timestamp": "2021-04-09T04:11:04.692Z",
            "successful": true
          }
        }
      },
      "execution_state": "executed",
      "version": -1
    },
    "trigger_event": {
      "type": "manual",
      "triggered_time": "2021-04-09T04:11:04.692Z",
      "manual": {
        "schedule": {
          "scheduled_time": "2021-04-09T04:11:04.692Z"
        }
      }
    },
    "input": {
      "search": {
        "request": {
          "search_type": "query_then_fetch",
          "indices": [
            "metricbeat123-*"
          ],
          "rest_total_hits_as_int": true,
          "body": {
            "query": {
              "range": {
                "@timestamp": {
                  "gte": "now-{{ctx.metadata.window_period}}"
                }
              }
            },
            "aggs": {
              "periods": {
                "filters": {
                  "filters": {
                    "history": {
                      "range": {
                        "@timestamp": {
                          "gte": "now-{{ctx.metadata.window_period}}"
                        }
                      }
                    },
                    "last_period": {
                      "range": {
                        "@timestamp": {
                          "gte": "now-{{ctx.metadata.last_period}}"
                        }
                      }
                    }
                  }
                },
                "aggs": {
                  "hosts": {
                    "terms": {
                      "field": "server.name",
                      "size": 10000
                    }
                  }
                }
              }
            },
            "size": 0
          }
        }
      }
    },
    "condition": {
      "script": {
        "source": "return ctx.payload.aggregations.periods.buckets.history.hosts.buckets.size() > ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.size();",
        "lang": "painless"
      }
    },
    "metadata": {
      "last_period": "35m",
      "window_period": "48h",
      "name": "testwatcher",
      "xpack": {
        "type": "json"
      }
    },
    "result": {
      "execution_time": "2021-04-09T04:11:04.692Z",
      "execution_duration": 2192,
      "input": {
        "type": "search",
        "status": "success",
        "payload": {
          "_shards": {
            "total": 14,
            "failed": 0,
            "successful": 14,
            "skipped": 6
          },
          "hits": {
            "hits": [],
            "total": 10000,
            "max_score": null
          },
          "took": 2187,
          "timed_out": false,
          "aggregations": {
            "periods": {
              "meta": {},
              "buckets": {
                "last_period": {
                  "doc_count": 157223,
                  "hosts": {
                    "doc_count_error_upper_bound": 0,
                    "sum_other_doc_count": 0,
                    "buckets": [
                      {
                        "doc_count": 1234,
                        "key": "server1"
                      },
                      {
                        "doc_count": 1785,
                        "key": "server2"
                      },
                                            {
                        "doc_count": 1111,
                        "key": "server3"
                      },
                      {
                        "doc_count": 1254,
                        "key": "server4"
                      }
                    ]
                  }
                },
                "history": {
                  "doc_count": 789564125,
                  "hosts": {
                    "doc_count_error_upper_bound": 0,
                    "sum_other_doc_count": 0,
                    "buckets": [
                      {
                        "doc_count": 895412,
                        "key": "server1"
                      },
                      {
                        "doc_count": 654123,
                        "key": "server2"
                      },
                      {
                        "doc_count": 123654,
                        "key": "server3"
                      },
                      {
                        "doc_count": 789654,
                        "key": "server4"
                      },
                      {
                        "doc_count": 96324,
                        "key": "server5"
                      },
                      {
                        "doc_count": 195752,
                        "key": "server6"
                      },
                      {
                        "doc_count": 98521,
                        "key": "server7"
                      }
                    ]
                  }
                }
              }
            }
          }
        },
        "search": {
          "request": {
            "search_type": "query_then_fetch",
            "indices": [
              "metricbeat123-*"
            ],
            "rest_total_hits_as_int": true,
            "body": {
              "query": {
                "range": {
                  "@timestamp": {
                    "gte": "now-48h"
                  }
                }
              },
              "aggs": {
                "periods": {
                  "filters": {
                    "filters": {
                      "history": {
                        "range": {
                          "@timestamp": {
                            "gte": "now-48h"
                          }
                        }
                      },
                      "last_period": {
                        "range": {
                          "@timestamp": {
                            "gte": "now-35m"
                          }
                        }
                      }
                    }
                  },
                  "aggs": {
                    "hosts": {
                      "terms": {
                        "field": "server.name",
                        "size": 10000
                      }
                    }
                  }
                }
              },
              "size": 0
            }
          }
        }
      },
      "condition": {
        "type": "script",
        "status": "success",
        "met": true
      },
      "actions": [
        {
          "id": "log",
          "type": "logging",
          "status": "simulated",
          "transform": {
            "type": "script",
            "status": "success",
            "payload": {
              "_value": [
                "server4",
                "server5",
                "server6",
                "server7"
              ]
            }
          },
          "logging": {
            "logged_text": "Systems not responding in the last 35 minutes:server4:server5:server6:server7"
          }
        },
        {
          "id": "email_administrator",
          "type": "email",
          "status": "simulated",
          "email": {
            "message": {
              "id": "someid",
              "from": "abc@xyz.com",
              "sent_date": "2021-04-09T04:11:06.886335006Z",
              "to": [
                "test123@xyz.com"
              ],
              "subject": "Systems not responding in the last 35m minutes",
              "body": {
                "text": ""
              }
            }
          }
        }
      ]
    },
    "messages": []
  }

How can I get the server4, server5, server6, server7 in my email text?

you will have to iterate through buckets object.

something like this

> def hosts = ctx.payload.aggregations.hosts.buckets; def data = [];for (def i=0 ; i < hosts.length ; i++) {
> 
> //your extra logic
> }

Thank you Rajsolanki. But can you please elaborate on your comment? Where should I add the logic that you provided?

you can add in transform.

"transform": {
        "script": {
          "source": "",
          "lang": "painless"
        }
      }

Did the following change:

"actions": {
    "log": {
      "transform": {
        "script": {
          "source": "def hosts = ctx.payload.aggregations.hosts.buckets; def data = [];for (def i=0 ; i < hosts.length ; i++);",
          "lang": "painless"
        }
      },

and I got this error when did the simulation:

"actions": [
      {
        "id": "log",
        "type": "logging",
        "status": "failure",
        "transform": {
          "type": "script",
          "status": "failure",
          "reason": "runtime error",
          "error": {
            "root_cause": [
              {
                "type": "script_exception",
                "reason": "runtime error",
                "script_stack": [
                  "hosts = ctx.payload.aggregations.hosts.buckets; def ",
                  "                                      ^---- HERE"
                ],
                "script": "def hosts = ctx.payload.aggregations.hosts.buckets; def data = [];for (def i=0 ; i < hosts.length ; i++);",
                "lang": "painless",
                "position": {
                  "offset": 42,
                  "start": 4,
                  "end": 56
                }
              }
            ],
            "type": "script_exception",
            "reason": "runtime error",
            "script_stack": [
              "hosts = ctx.payload.aggregations.hosts.buckets; def ",
              "                                      ^---- HERE"
            ],
            "script": "def hosts = ctx.payload.aggregations.hosts.buckets; def data = [];for (def i=0 ; i < hosts.length ; i++);",
            "lang": "painless",
            "position": {
              "offset": 42,
              "start": 4,
              "end": 56
            },
            "caused_by": {
              "type": "null_pointer_exception",
              "reason": "Cannot invoke \"Object.getClass()\" because \"callArgs[0]\" is null"
            }
          }
        },
        "reason": "Failed to transform payload"
      },

I think your json object is in your result. My guess its

if not look at your result and construct it. Along with that you need to extract value of element and assign to variable.

try this

def hosts = ctx.payload.aggregations.periods.buckets; 
def data = [];
for (def i=0 ; i < hosts.length ; i++) 
{def host = hosts[i];
def buckets = host.buckets.stream().map(it -> ['Server':host.key]).collect(Collectors.toList());data.addAll(buckets);
}
return data

Its giving me an error again as Invalid JSON

"actions": {
    "log": {
      "transform": {
        "script": {
          "source": "def hosts = ctx.payload.aggregations.periods.buckets; 
					def data = [];
					for (def i=0 ; i < hosts.length ; i++) 
					{def host = hosts[i];
					def buckets = host.buckets.stream().map(it -> ['Server':host.key]).collect(Collectors.toList());data.addAll(buckets);
					}
					return data;",
          "lang": "painless"
        }
      },

you need to close your source tag :slight_smile:

please see this as an exmaple.Running an action for each element in an array | Elasticsearch Guide [7.12] | Elastic

My bad, but I am still not getting it:

"actions": {
    "log": {
      "transform": {
        "script": {
          "source": "def hosts = ctx.payload.aggregations.periods.buckets;",
					def data = [];
					for (def i=0 ; i < hosts.length ; i++) 
					{def host = hosts[i];
					def buckets = host.buckets.stream().map(it -> ['Server':host.key]).collect(Collectors.toList());data.addAll(buckets);
					}
					return data
          "lang": "painless"
        }
      },

This throws bad string and invalid JSON error message :expressionless:

paste your entire watcher here please.

This is how my watcher looks now:

{
    "input": {
        "search": {
            "request": {
                "search_type": "query_then_fetch",
                "indices": [
                    "metricbeat123-*"
                ],
                "rest_total_hits_as_int": true,
                "body": {
                    "query": {
                        "range": {
                            "@timestamp": {
                                "gte": "now-{{ctx.metadata.window_period}}"
                            }
                        }
                    },
                    "aggs": {
                        "periods": {
                            "filters": {
                                "filters": {
                                    "history": {
                                        "range": {
                                            "@timestamp": {
                                                "gte": "now-{{ctx.metadata.window_period}}"
                                            }
                                        }
                                    },
                                    "last_period": {
                                        "range": {
                                            "@timestamp": {
                                                "gte": "now-{{ctx.metadata.last_period}}"
                                            }
                                        }
                                    }
                                }
                            },
                            "aggs": {
                                "hosts": {
                                    "terms": {
                                        "field": "server.name",
                                        "size": 10000
                                    }
                                }
                            }
                        }
                    },
                    "size": 0
                }
            }
        }
    },
    "condition": {
        "script": {
            "source": "return ctx.payload.aggregations.periods.buckets.history.hosts.buckets.size() > ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.size();",
            "lang": "painless"
        }
    },
    "actions": {
    "log": {
      "transform": {
        "script": {
          "source": "def hosts = ctx.payload.aggregations.periods.buckets;",
					def data = [];
					for (def i=0 ; i < hosts.length ; i++) 
					{def host = hosts[i];
					def buckets = host.buckets.stream().map(it -> ['Server':host.key]).collect(Collectors.toList());data.addAll(buckets);
					}
					return data
          "lang": "painless"
        }
      },
            "logging": {
                "level": "info",
                "text": "Systems not responding in the last {{ctx.metadata.last_period}} minutes:{{#ctx.payload._value}}{{.}}:{{/ctx.payload._value}}"
            }
        },
        "email_administrator": {
            "email": {
                "profile": "standard",
                "attachments": {
                    "attached_data": {
                        "data": {
                            "format": "json"
                        }
                    }
                },
                "from": "abc@xyx.com",
                "to": [
                    "test123@xyz.com"
                ],
                "subject": "Encountered {{ctx.payload.hits.total}} Metricbeat Agents not reporting in last 35 mins",
                "body": {
                    "text": "{{#ctx.payload._value}}{{.}}:{{/ctx.payload._value}}"
                }
            }
        }
    },
    "metadata": {
        "last_period": "35m",
        "window_period": "48h"
    },
    "throttle_period_in_millis": 300000
}

You log action is already writing all servers. You just want email toshow all server name erroring. Try this email action

"actions": {
    "servers_erroring": {
      "transform": {
        "script": {
          "source": "def hosts = ctx.payload.aggregations.periods.buckets; def data = [];for (def i=0 ; i < hosts.length ; i++) {def host = hosts[i];def buckets = host.buckets.stream().map(it -> ['HostName':hostname.key]).collect(Collectors.toList());data.addAll(buckets);}return data",
          "lang": "painless"
        }
      },
      "email": {
        "profile": "standard",
        "attachments": {
          "servers_erroing": {
            "data": {
              "format": "json"
            }
          }
        },
        "from": "abc@xyx.com",
        "to": [
          "test123@xyz.com"
        ],
        "subject": "Encountered {{ctx.payload.hits.total}} Metricbeat Agents not reporting in last 35 mins",
        "body": {
          "html": "<html><head><meta http-equiv='Content-Type' content='text/html; charset=utf-8' /></head><body><hr size='1' /><table><tbody><tr><th>HostName</th></tr> {{#ctx.payload._value}}<tr><td>{{HostName}}</td><td></td><td></td></tr> {{/ctx.payload._value}}</tbody></table></body></html>"
        }
      }
    }
  }

that should be

['HostName':host.key]

Thank you for helping me on this.
Now I am getting simulation error as "Failed to transform payload"

Watcher.json:

{
  "trigger": {
    "schedule": {
      "interval": "1h"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "metricbeat-*"
        ],
        "rest_total_hits_as_int": true,
        "body": {
          "query": {
            "range": {
              "@timestamp": {
                "gte": "now-{{ctx.metadata.window_period}}"
              }
            }
          },
          "aggs": {
            "periods": {
              "filters": {
                "filters": {
                  "history": {
                    "range": {
                      "@timestamp": {
                        "gte": "now-{{ctx.metadata.window_period}}"
                      }
                    }
                  },
                  "last_period": {
                    "range": {
                      "@timestamp": {
                        "gte": "now-{{ctx.metadata.last_period}}"
                      }
                    }
                  }
                }
              },
              "aggs": {
                "hosts": {
                  "terms": {
                    "field": "agent.name",
                    "size": 10000
                  }
                }
              }
            }
          },
          "size": 0
        }
      }
    }
  },
  "condition": {
    "script": {
      "source": "return ctx.payload.aggregations.periods.buckets.history.hosts.buckets.size() > ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.size();",
      "lang": "painless"
    }
  },
  "actions": {
    "servers_erroring": {
      "transform": {
        "script": {
          "source": "def hosts = ctx.payload.aggregations.periods.buckets; def data = [];for (def i=0 ; i < hosts.length ; i++) {def host = hosts[i];def buckets = host.buckets.stream().map(it -> ['HostName':host.key]).collect(Collectors.toList());data.addAll(buckets);}return data",
          "lang": "painless"
        }
      },
      "logging": {
        "level": "info",
        "text": "Systems not responding in the last {{ctx.metadata.last_period}} minutes:{{#ctx.payload._value}}{{.}}:{{/ctx.payload._value}}"
      }
    },
    "email_administrator": {
      "email": {
        "profile": "standard",
        "attachments": {
          "attached_data": {
            "data": {
              "format": "json"
            }
          }
        },
        "from": "abc@xyx.com",
        "to": [
          "test123@xyz.com"
        ],
        "subject": "Encountered {{ctx.payload.hits.total}} Metricbeat Agents not reporting in last 15 mins",
        "body": {
          "html": "<html><head><meta http-equiv='Content-Type' content='text/html; charset=utf-8' /></head><body><hr size='1' /><table><tbody><tr><th>HostName</th></tr> {{#ctx.payload._value}}<tr><td>{{HostName}}</td><td></td><td></td></tr> {{/ctx.payload._value}}</tbody></table></body></html>"
        }
      }
    }
  },
  "metadata": {
    "last_period": "35m",
    "window_period": "24h"
  },
  "throttle_period_in_millis": 300000
}

Simulation result:

"condition": {
      "type": "script",
      "status": "success",
      "met": true
    },
"actions": [
      {
        "id": "servers_erroring",
        "type": "logging",
        "status": "failure",
        "transform": {
          "type": "script",
          "status": "failure",
          "reason": "runtime error",
          "error": {
            "root_cause": [
              {
                "type": "script_exception",
                "reason": "runtime error",
                "script_stack": [
                  "i=0 ; i < hosts.length ; i++) {def ",
                  "               ^---- HERE"
                ],
                "script": "def hosts = ctx.payload.aggregations.periods.buckets; def data = [];for (def i=0 ; i < hosts.length ; i++) {def host = hosts[i];def buckets = host.buckets.stream().map(it -> ['HostName':host.key]).collect(Collectors.toList());data.addAll(buckets);}return d ...",
                "lang": "painless",
                "position": {
                  "offset": 92,
                  "start": 77,
                  "end": 112
                }
              }
            ],
            "type": "script_exception",
            "reason": "runtime error",
            "script_stack": [
              "i=0 ; i < hosts.length ; i++) {def ",
              "               ^---- HERE"
            ],
            "script": "def hosts = ctx.payload.aggregations.periods.buckets; def data = [];for (def i=0 ; i < hosts.length ; i++) {def host = hosts[i];def buckets = host.buckets.stream().map(it -> ['HostName':host.key]).collect(Collectors.toList());data.addAll(buckets);}return d ...",
            "lang": "painless",
            "position": {
              "offset": 92,
              "start": 77,
              "end": 112
            },
            "caused_by": {
              "type": "null_pointer_exception",
              "reason": "Cannot invoke \"Object.getClass()\" because \"rightObject\" is null"
            }
          }
        },
        "reason": "Failed to transform payload"
      },
      {
        "id": "email_administrator",
        "type": "email",
        "status": "simulated",
        "email": {
          "message": {
            "id": "email_admin_id",
            "from": "abc@xyz.com",
            "sent_date": "2021-04-09T22:25:34.479287016Z",
            "to": [
              "test123@xyz.com"
            ],
            "subject": "Encountered 10000 Metricbeat Agents not reporting in last 35 mins",
            "body": {
              "html": "<head></head><body><hr /><table><tbody><tr><th>HostName</th></tr></tbody></table></body>"
            }
          }
        }
      }
    ]
  },
  "messages": []
}

If you check my first simulation result, it does give the server names in the logging field as expected.

"logging": {
            "logged_text": "Systems not responding in the last 35 minutes:server4:server5:server6:server7"
          }
        },

yah so use that object. may be ctx.payload.aggregations.periods.buckets.history.hosts

Tried adding:

"body": {
          "text": "Systems not responding in the last 35 minutes: {{ctx.payload.aggregations.periods.buckets.history.hosts}}"
        }

to my body portion of the email and now I am getting lists of all the servers that are there, even the ones which are reporting.

well you know more about your search query and result. So just pick right object to iterate through and map its value.