I am trying to create a Watcher alert over monitoring indices. I want to aggregate over 'N' different clusters and output the nodes with it's cluster name if the disk utilization goes above a specified threshold %
"aggs": {
"clusters": {
"terms": {
"field": "cluster_uuid",
"size": 100
},
"aggs": {
"nodes": {
"terms": {
"field": "source_node.name",
"size": 100
},
"aggs": {
"total_in_bytes": {
"max": {
"field": "node_stats.fs.total.total_in_bytes"
}
},
"available_in_bytes": {
"max": {
"field": "node_stats.fs.total.available_in_bytes"
}
},
"free_ratio": {
"bucket_script": {
"buckets_path": {
"available_in_bytes": "available_in_bytes",
"total_in_bytes": "total_in_bytes"
},
"script": "params.available_in_bytes / params.total_in_bytes"
}
}
}
}
}
}
},
Here is my transform script which works but does not filter out the ones which don't meet the criteria like I have my condition setup on
"condition": {
"script": {
"lang": "painless",
"source": "return ctx.payload.aggregations.clusters.buckets.stream().anyMatch(cluster -> cluster.nodes.buckets.stream().anyMatch(node -> node.free_ratio.value < ctx.metadata.lower_bound));"
}
},
"transform": {
"script": {
"lang": "painless",
"source": "def hosts = []; ctx.payload.aggregations.clusters.buckets.stream().forEach(cluster -> cluster.nodes.buckets.stream().forEach(node -> hosts.add(['cluster':cluster.key,'node':node.key,'available_in_gb':Math.round((node.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((node.total_in_bytes.value/1073741824)* 100)/100])));return ['hosts': hosts]"
}
},
I want to filter out the nodes which are below the threshold so that they are not transformed and sent to the action block, adding a filter to the above does not work so, I tried something like this in one transform block
"source": "ctx.payload.aggregations.clusters.buckets.stream().forEach(cluster -> cluster.nodes.buckets.stream().filter(it -> it.free_ratio.value < ctx.metadata.lower_bound).map(it -> ['cluster_name':cluster.key,'node_name':it.key,'available_in_gb':Math.round((it.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((it.total_in_bytes.value/1073741824)* 100)/100])).collect(Collectors.toList())"
but, get this exception
"script_stack": [
"it -> ['cluster_name':cluster.key,'node_name':it.key,'available_in_gb':Math.round((it.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((it.total_in_bytes.value/1073741824)* 100)/100])).collect(Collectors.toList())",
" ^---- HERE"
],
"script": "ctx.payload.aggregations.clusters.buckets.stream().forEach(cluster -> cluster.nodes.buckets.stream().filter(it -> it.free_ratio.value < ctx.metadata.lower_bound).map(it -> ['cluster_name':cluster.key,'node_name':it.key,'available_in_gb':Math.round((it.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((it.total_in_bytes.value/1073741824)* 100)/100])).collect(Collectors.toList())",
"lang": "painless",
"caused_by": {
"type": "null_pointer_exception",
"reason": null
}
I also tried a chain transform which dint work. Is there an alternative way to filter or flatten this sub aggregation?