Well thanks again, that gave me the push in the right direction. it took me a few wrong turns and i ended up with this:
look back for two minutes in the beats-monitoring index, bucket-aggregate by hostname, return stats of event-dropped counter, substract min counter value from max and you have the number of dropped events for the two minutes per host.
POST /_watcher/watch/filebeat_dropped_events
{
"trigger": {
"schedule": {
"interval": "2m"
}
},
"input": {
"search": {
"request": {
"indices": [
".monitoring-beats*"
],
"body": {
"query": {
"bool": {
"filter": [
{
"range": {
"timestamp": {
"gte": "now-2m",
"lte": "now"
}
}
},
{
"match_phrase": {
"beats_stats.beat.type": "filebeat"
}
}
]
}
},
"size": 0,
"aggs": {
"hosts": {
"composite": {
"sources": [
{
"host": {
"terms": {
"field": "beats_stats.beat.host"
}
}
}
]
},
"aggs": {
"event_stats": {
"stats": {
"field": "beats_stats.metrics.libbeat.output.events.dropped"
}
}
}
}
}
}
}
}
},
"condition": {
"script": """
ctx.payload.aggregations.hosts.buckets.stream().filter( el -> el.event_stats.max != el.event_stats.min ).collect(Collectors.toList());
return(ctx.payload.aggregations.hosts.buckets.length > 0)
"""
},
"actions": {
"log": {
"transform": {
"script": """
def out=ctx.payload.aggregations.hosts.buckets.stream().filter( el -> el.event_stats.max != el.event_stats.min ).map( el -> ['name': el.key.host, 'dropped': el.event_stats.max - el.event_stats.min ] ).collect(Collectors.toList());
return(out)
"""
},
"logging": {
"text": "{{#ctx.payload._value}}host {{name}} lost {{dropped}} events. {{/ctx.payload._value}}"
}
}
}
}
in the production setup it also writes chatmessages via webhook, but for demonstration purposes logging is sufficient.
the condition checks for inequality, because i have no need for thresholds.
the count calculations for the alert are a bit dangerous, it does not account for counter resets or -overflows. any suggestion for that or other criticism is highly appreciated.
all the best and stay safe
serge