Watcher - how to set the doc_values condition for a query with a filter terms aggregation. (Detect lack of logs per hostname - use case)

Hello friends,

I wanted to create a few watchers that help us to detect when we are not receiving specific logs that are crucial for the monitoring chain.
It means, to not be receiving logs from specific servers.

For that purpose, I created a watcher that runs every minute, queries a specific index looking for a specific field (state.keyword) and specific servers by means of a filter term aggregation on the host.name.keyword field.
I do that because I know the complete list of servers and I need to detect whether we receive logs from all of them. If I use a group by aggregation and 0 logs from a specific server is received, that server wont be returned as result in the query. So that way the alert should not be helpful.

Then, since I want to detect 0 logs are received, I set a condition to detect when the doc.count is 0.

And here is where the issue comes to me :frowning:

I could find a good way to set the condition, so it is not getting the proper doc_values value.

This is the code of my watcher:

{
  "trigger": {
    "schedule": {
      "interval": "1m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": ["aaa-*"],
        "rest_total_hits_as_int": true,
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "must": {
                "range": {
                  "indexed": {
                    "from": "now-5m",
                    "to": "now",
                    "include_lower": true,
                    "include_upper": true,
                    "format": "epoch_millis",
                    "boost": 1
                  }
                }
              },
              "filter": {
                "exists": {"field": "state.keyword"}
              }
            }
          },
          "aggs": {
            "MSEs": {
            "filters": {
              "filters": {
                "HOST1": { "term": {"host.name.keyword": "HOST1"}},
                "HOST2": { "term": {"host.name.keyword": "HOST2"}},
                "HOST3": { "term": {"host.name.keyword": "HOST3"}},
                "HOST4": { "term": {"host.name.keyword": "HOST4"}}
                }
              }
            }
          }
        }
      }
    }
  },
   "condition": {
    "array_compare": {
      "ctx.payload.aggregations.MSEs.buckets.filters": {
        "path": "doc_count.value",
        "eq": {
          "value": 0
        }
      }
    }
  },
  "actions": {
    "from_ESwatcher_to_Opsgenie" : {
    "webhook" : {
        "scheme" : "https",
        "method" : "POST",
        "host" : "api.opsgenie.com",
        "port" : 443,
        "path" : "/v1/json/eswatcher",
        "headers" : {
            "Content-Type" : "application/json"
        },
        "params": {
            "apiKey": "xxxxxxxxxxxx"
        },
        "body" : "{{#toJson}}ctx{{/toJson}}"
    }
    }
  }
}

If the input is run as a search query, it retrieves good values:

GET XXXXX*/_search
{
  "query":
  {
    "bool": { 
      "must": {
        "range": 
        { 
          "indexed": {
            "from": "now-5m",
            "to": "now",
            "include_lower": true,
            "include_upper": true,
            "format": "epoch_millis",
            "boost": 1
          }
        }
      },
      "filter":
        { 
          "exists": 
          { 
            "field": "state.keyword"
          }
        }
    }
  },
  "aggs": {
    "MSEs": {
      "filters": {
        "filters": {
          "HOST1": { "term": {"host.name.keyword": "HOST1"}},
          "HOST2": { "term": {"host.name.keyword": "HOST2"}},
          "HOST3": { "term": {"host.name.keyword": "HOST3"}},
          "HOST4": { "term": {"host.name.keyword": "HOST4"}}
          }
        }
      }
    }
}

The result of the query used in the INPUT seems to be good:

  "aggregations" : {
    "MSEs" : {
      "buckets" : {
        "HOST1" : {
          "doc_count" : 67
        },
        "HOST2" : {
          "doc_count" : 0
        },
        "HOST3" : {
          "doc_count" : 0
        },
        "HOST4" : {
          "doc_count" : 1
        }
        }
      }
    }
  }

But when I simulate the execution of the watcher, I got this exception:

exception": {
    "type": "illegal_state_exception",
    "reason": "array path ctx.payload.aggregations.MSEs did not evaluate to array, was {buckets={HOST1={doc_count=55}, HOST2={doc_count=54}, HOST3={doc_count=55}, HOST4={doc_count=54)"
  }

I tried several ways with no success:

  • path = doc_count. && ctx.payload.aggregations.MSEs.buckets
  • ctx.payload.aggregations.MSEs.buckets.filters
  • ctx.payload.aggregations.MSEs

Even modifying the condition as a painless script:

  "condition": {
    "script": {
      "lang" : "painless",
      "source" : "return ctx.payload.aggregations.MSEs.buckets.size() == 0"
     }
  },

But I did not succeed to get the proper result.

Which should be the way to refer to de "doc_values" returned by the input, when it comes to an array?

Thanks in advance for your help!

Best regards,
M

Hi there,

I've been trying different things with no success.
What I realized is that I was trying to loop over the results of the input query as in was an array, when actually the aggregations return a bucket of buckets in key:value format, it means, a HashMap. So I cannot use array_compare conditions or trying to loop the result as it is an array list.

Which should be the correct approach to loop over a hashmap?

The idea is to evaluate the doc_count value, and in case it is 0 (no logs are received) raise an alarm returning the hostname affected.

For instance, this was my last try with no success:

"condition": {
  "script":
  """
  return ctx.payload.aggregations.MSEs.buckets.stream()
  .filter(MSE -> MSE.doc_count.value < 1)
  """
}

It returns an exception:

  "exception": {
    "type": "script_exception",
    "reason": "runtime error",
    "script_stack": [
      "return ctx.payload.aggregations.MSEs.buckets.stream()\n  .filter(",
      "                                            ^---- HERE"
    ],
    "script": "\n  return ctx.payload.aggregations.MSEs.buckets.stream()\n  .filter(MSE -> MSE.doc_count.value < 1)\n  ",
    "lang": "painless",
    "position": {
      "offset": 47,
      "start": 3,
      "end": 67
    },
    "caused_by": {
      "type": "illegal_argument_exception",
      "reason": "dynamic method [java.util.HashMap, stream/0] not found",
      "stack_trace": "java.lang.IllegalArgumentException: dynamic method [java.util.HashMap, stream/0] not found\n\tat org.elasticsearch.painless.Def.lookupMethod(Def.java:205)\n\tat

Thanks in advance!!!

Regards,
M

Hi there,
Solution found here, if it helps to anyone:

    "condition": {
      "script": {
        "source": """
ctx.payload.aggregations.MSEs.buckets.entrySet().stream().filter(e-> e.value.doc_count == 0).count() > 0
"""
      }
    },
    "transform" : {
      "script":
      """
        return [
          'only_ones': ctx.payload.aggregations.MSEs.buckets.entrySet().stream()
          .filter(e-> e.value.doc_count == 0).map(e-> [ 'key': e.key, 'value': e.value.doc_count ]).collect(Collectors.toList())
          ]
      """
    },
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.