Watcher input extract fields

I am working towards creating a watcher that triggers a webhook when my JVM heap usage crosses a particular threshold.
I have a http input which requests the node stats from the cluster. Currently the entire response is added to the watcher context payload.

"input" : {
"http" : {
"request" : {
"host" : "locahost",
"port" : 9200,
"path" : "/_nodes/stats/jvm"
}
}
}

I intend to use only the nodes.#nodeName#.jvm.mem.heap_used_percent and the nodes.#nodeName#.name inorder for my conditions and actions.

How would i use the extract attribute to retrieve only these fields in the scenario where i don't know the json property before hand.

Why not install Marvel and then use the examples watch here - https://www.elastic.co/guide/en/watcher/current/watching-marvel-data.html#watching-memory-usage

Hey,

the extract requires the exact field name. You could use filter_path to reduce the JSON being returned, but you would still need a transform to remove the changing node ids and put the data into an array for easy comparison (i.e. using the array compare to check if any of the heaps is higher than a certain percentage).

--Alex

Thanks Alex . This seems to do the trick

 "input": {
      "http": {
         "request": {
            "host": "localhost",
            "port": 9200,
            "path": "/_nodes/stats/jvm",
            "params": {
               "filter_path": "nodes.**.name,nodes.**.jvm.mem.heap_used_percent"
            }
         }
      }
   }

I am having troubles with using "extract" in an input.http.request. Can someone share more examples?

My issue is that the parser does not seem to like the "extract" syntax I provide. For example, I take nidheeshb's example and it runs fine. However, if I replace the "params" element with:
"extract": ["cluster_name"]
it appears the parser does not validate the request. It returns:
{
"error": {
"root_cause": [
{
"type": "parse_exception",
"reason": "could not parse http request template. unexpected token [START_ARRAY] for field [extract]"
}
],
"type": "parse_exception",
"reason": "could not parse [http] input for watch [filter_example]. failed to parse http request template",
"caused_by": {
"type": "parse_exception",
"reason": "could not parse http request template. unexpected token [START_ARRAY] for field [extract]"
}
},
"status": 400
}

Hey,

can you provide the full watch, so others can debug?

--Alex

Yes, I have been trying to run this, which is similar to what I really want to do with my data:

PUT http://{{host}}/_watcher/watch/filter_example
{
  "trigger" : {
    "schedule" : { "interval" : "60s" } 
  },
  "input": {
      "http": {
         "request": {
            "host": "localhost",
            "port": 9200,
            "path": "/_nodes/stats/jvm",
            "extract": ["cluster_name"] 
         }
      }
   },
  "actions": {
    "log": {
      "logging": {
        "text": "Watcher ran {{ctx.watch_id}} at {{ctx.execution_time}} with payload {{ctx.payload}}"
      }
    }
  }  
}

Hey,

try this for your input

"input": {
      "http": {
         "request": {
            "host": "localhost",
            "port": 9200,
            "path": "/_nodes/stats/jvm"
         },
         "extract": ["cluster_name"] 
      }
   },

--Alex

Alex, thank you. You have fixed up the example and now that extract works for me. Can you share the docs or the code that has the schema for watcher?

On to the next question. I am trying to run a very similar watch, but the http request to the host is an aggregation. Thus the http request requires a put method and a body. Despite, expecting an aggregation JSON response, none of my extract keys is returning any data. I suspect my aggregation is not running from the watch correctly, even though the aggregation runs correctly when submitted driectly to Elasticsearch.

Is there any way to have the extract return the entire response so I can debug it and fix my watch input http request?

Thanks, beckerdo

Hey Dan,

see this link for the docs. Also if you scroll up/down a bit you will see the allowed parameters for search/http input, and if they need to be put inside of the request, they are named was request.*.

If you run an aggregation, you might want to use the search input instead of the http one.

Regardless, if you need to debug, you can use the Execute Watch API, which will return what is passed in each step, that should help you figure things out.

Hope this helps!

--Alex

Once again thanks for the help!

Thanks for the doc link. I was reading it incorrectly and putting "extract" into "input.http.request" rather than "input.http".

The reason I am not using the search input is that this watch runs on a different machine than the one with the data I am aggregating. Is there a way to use the search input on a different machine than the http.request ?

Finally, thanks for the Execute Watch API. It is helpful, but I am seeing the same issue? Whether I extract "aggregations" or "hits" or "cluster_name", I see no data extracted to the context payload at ctx.payload. In other words, this action:

  "actions": {
    "log": {
      "logging": {
        "text": "Watcher ran {{ctx.watch_id}} at {{ctx.execution_time}} with payload {{ctx.payload}}"
      }
    } 

Shows this message (note the payload is empty):

      "actions": [
        {
          "id": "log",
          "type": "logging",
          "status": "success",
          "logging": {
            "logged_text": "Watcher ran cal_tpv_agg_watch at 2016-03-09T17:13:15.871Z with payload {}"
          }
        }

I notice the http.request returned a status code of 200, so that seems like it worked. But perhaps there is a way to extract the entire response into the watch context?

Hey,

the execute watch API should help you to see whats going on, as you can see the output. When I played around with the cluster_name extract, this showed up in the output

"result": {
      "execution_time": "2016-03-09T18:06:53.470Z",
      "execution_duration": 58,
      "input": {
        "type": "http",
        "status": "success",
        "payload": {
          "cluster_name": "elasticsearch"
        },
  ...

So, you can check if there is anything in the output. Given that your logging message is empty, we should check first without specifying extract, if anything is returned.

if so, feel free to post the full example (including data) for reproduction.

--Alex

If I execute the watch without extract, I see the payload contains the following:

          "logging": {
            "logged_text": "Watcher ran cal_tpv_agg_watch at 2016-03-09T18:53:54.499Z with payload {_index=cal-root-2016-03-07, _type=logs, _id=_search?search_type=count, _version=157, created=false}"
          }

The index, type, and id/API look correct. That is the same whether I run a request directly to the server or through a watch body. However, when run on the server, the response contains such elements as "took", "hits", and "aggregations".

My aggregation looks something like this where, m_id and amt are integers, where the aggregation is adding the amts and bucketing them by id. It will be hard to share the data, since I have to remove company-specific and non-essential things , or come up with some mock data.

{
   "aggs": {
        "tpv": {
            "terms": {
                "field": "payload.id",
                 "order" : { "total_volume" : "desc" }
            },
            "aggs": { 
                "total_volume": {
                   "sum": {
                        "field": "payload.amt"
                    }
                }
            }
        }
    }
} 

Does the response payload provide any insight as to what I am doing wrong?
Thanks, beckerdo

Hey,

that's a classic reason why I always ask for the whole watch :slightly_smiling:

take a closer look at the returned data

{
_index=cal-root-2016-03-07, 
_type=logs, 
_id=_search?search_type=count, 
_version=157, created=false
}

This is the typical response, when a document has been created. Is it possible, that instead of a GET operation, you are using PUT?

--Alex

That sounds plausible. The watch is created with a PUT. However it does not matter if the watch body input.http.request.method is omitted, "get", "post", or "put": the watch execute returns the same response.

Here is the entire watch, sanitized:

{
  "trigger" : {
    "schedule" : { "interval" : "3600s" } 
  },
  "input" : {
    "http" : {
      "request" : {
       "host" : "elasticserver.dev.mycorp.com",
       "port" : 9200,
       "path" : "/cal-root-2016-03-07/logs/_search?search_type=count",
       "body" : "{ \"aggs\": { \"tpv\": { \"terms\": { \"field\": \"payload.id\", \"order\" : { \"total_volume\" : \"desc\" } }, \"aggs\": {  \"total_volume\": { \"sum\": { \"field\": \"payload.amt\" }}}}}}"
      }
    }
  },
  "actions": {
    "log": {
      "logging": {
        "text": "Watcher ran {{ctx.watch_id}} at {{ctx.execution_time}} with payload {{ctx.payload}}"
      }
    }
  }  
}

Would the HTTP client have something to do with this? I usually use POSTMAN which does not allow bodies for GET request. Most of the elastic GET requests that require bodies work with PUT requests in POSTMAN. However, here I do not think it applies since the watch has the request verb and body explicitly listed in the JSON.

The payload in the index looks something like this:

{
          "payload": {
            "amt": "14820",
            "id": "5"
          }
}

We usually randomly vary the id from 1 to 10 and the amount from 500 to 20000 so we can bucket some aggregations.

Hey,

can you try POST as a method and see if it works? Also please use the request params attribute for the search_type - might be that this is escaped wrongly and thus Elasticsearch tries to store this resource.

Thanks!

--Alex

OH YEA! THANKS! It appears that breaking the query params into another element worked!

For posterity, here is a summary of what worked on my aggregation.
PUT http://{{host}}/_watcher/watch/cal_tpv_agg_watch

{
  "trigger" : {
    "schedule" : { "interval" : "3600s" } 
  },
  "input" : {
    "http" : {
      "request" : {
       "host" : "elasticserver.dev.mycorp.com",
       "port" : 9200,
       "headers": {
            "Content-Type" : "application/json",
            "Accept" : "application/json"
       },
       "params": {
            "search_type" : "count"
        }, 
       "path" : "/cal-root-2016-03-07/logs/_search",
       "method": "post",
       "body" : "{ \"aggs\": { \"tpv\": { \"terms\": { \"field\": \"payload.m_merchant_account_number\", \"order\" : { \"total_volume\" : \"desc\" } }, \"aggs\": {  \"total_volume\": { \"sum\": { \"field\": \"payload.usd_amt\" }}}}}}"
      }
    }
  },
  "actions": {
    "log": {
      "logging": {
        "text": "Watcher ran {{ctx.watch_id}} at {{ctx.execution_time}} with payload {{ctx.payload}}"
      }
    }
  }  
}

It is now returning the aggregation response in the watcher response payload.