Transform fails with null_pointer_exception for nested aggregation on adding a filter condition

I am trying to create a Watcher alert over monitoring indices. I want to aggregate over 'N' different clusters and output the nodes with it's cluster name if the disk utilization goes above a specified threshold %

	      "aggs": {
            "clusters": {
              "terms": {
                "field": "cluster_uuid",
                "size": 100
              },
			  "aggs": {
                "nodes": {
                  "terms": {
                    "field": "source_node.name",
                    "size": 100
                  },
                  "aggs": {
                    "total_in_bytes": {
                      "max": {
                        "field": "node_stats.fs.total.total_in_bytes"
                      }
                    },
                   "available_in_bytes": {
                      "max": {
                        "field": "node_stats.fs.total.available_in_bytes"
                      }
                    },
                   "free_ratio": {
                     "bucket_script": {
                       "buckets_path": {
                         "available_in_bytes": "available_in_bytes",
                         "total_in_bytes": "total_in_bytes"
                       },
                       "script": "params.available_in_bytes / params.total_in_bytes"
                     }
                   }
                 }
                }
			  }
			}  
          },

Here is my transform script which works but does not filter out the ones which don't meet the criteria like I have my condition setup on

"condition": {
    "script": {
    "lang": "painless",
    "source": "return ctx.payload.aggregations.clusters.buckets.stream().anyMatch(cluster -> cluster.nodes.buckets.stream().anyMatch(node -> node.free_ratio.value < ctx.metadata.lower_bound));"
	}
  },
  "transform": {
    "script": {
    "lang": "painless",
    "source": "def hosts = []; ctx.payload.aggregations.clusters.buckets.stream().forEach(cluster -> cluster.nodes.buckets.stream().forEach(node -> hosts.add(['cluster':cluster.key,'node':node.key,'available_in_gb':Math.round((node.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((node.total_in_bytes.value/1073741824)* 100)/100])));return ['hosts': hosts]"
    }
  },

I want to filter out the nodes which are below the threshold so that they are not transformed and sent to the action block, adding a filter to the above does not work so, I tried something like this in one transform block

"source": "ctx.payload.aggregations.clusters.buckets.stream().forEach(cluster -> cluster.nodes.buckets.stream().filter(it -> it.free_ratio.value < ctx.metadata.lower_bound).map(it -> ['cluster_name':cluster.key,'node_name':it.key,'available_in_gb':Math.round((it.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((it.total_in_bytes.value/1073741824)* 100)/100])).collect(Collectors.toList())"

but, get this exception

"script_stack": [

  "it -&gt; ['cluster_name':cluster.key,'node_name':it.key,'available_in_gb':Math.round((it.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((it.total_in_bytes.value/1073741824)* 100)/100])).collect(Collectors.toList())",

  " ^---- HERE"

],

"script": "ctx.payload.aggregations.clusters.buckets.stream().forEach(cluster -&gt; cluster.nodes.buckets.stream().filter(it -&gt; it.free_ratio.value &lt; ctx.metadata.lower_bound).map(it -&gt; ['cluster_name':cluster.key,'node_name':it.key,'available_in_gb':Math.round((it.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((it.total_in_bytes.value/1073741824)* 100)/100])).collect(Collectors.toList())",

"lang": "painless",

"caused_by": {

"type": "null_pointer_exception",

"reason": null

}

I also tried a chain transform which dint work. Is there an alternative way to filter or flatten this sub aggregation?

having all those snippets makes reading pretty hard. It's much easier if you can provide all the information at once in a gist. All information is

  • the whole watch
  • the full output of the execute watch API or a watcher history entry
  • if there are any stack traces in the logfiles please include those as well

I am not sure on top of my head if .forEach() can actually collect data into a list or just returns void.A full stack trace (which should be part of the exeucte watch api or the watch history entry) should reveal more and also provide the possibility to reproduce this without needing any data.

--Alex

@spinscale here is the watcher and it's execution

filter instead of forEach gives me this error

"transform": {
    "script": {
    "lang": "painless",
    "source": "ctx.payload.aggregations.clusters.buckets.stream().filter(cluster -> cluster.nodes.buckets.stream().filter(it -> it.free_ratio.value < ctx.metadata.lower_bound).map(it -> ['cluster_name':cluster.key,'node_name':it.key,'available_in_gb':Math.round((it.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((it.total_in_bytes.value/1073741824)* 100)/100])).collect(Collectors.toList())"
    }
  }

Error

"type": "script_exception",

"reason": "runtime error",

"script_stack": [

  "java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)",

  "java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)",

  "java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)",

  "java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)",

  "java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)",

  "java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)",

  "java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)",

  "it -&gt; ['cluster_name':cluster.key,'node_name':it.key,'available_in_gb':Math.round((it.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((it.total_in_bytes.value/1073741824)* 100)/100])).collect(Collectors.toList())",

  " ^---- HERE"

],

"script": "ctx.payload.aggregations.clusters.buckets.stream().filter(cluster -&gt; cluster.nodes.buckets.stream().filter(it -&gt; it.free_ratio.value &lt; ctx.metadata.lower_bound).map(it -&gt; ['cluster_name':cluster.key,'node_name':it.key,'available_in_gb':Math.round((it.available_in_bytes.value/1073741824) * 100)/100,'total_in_gb':Math.round((it.total_in_bytes.value/1073741824)* 100)/100])).collect(Collectors.toList())",

"lang": "painless",

"caused_by": {

"type": "class_cast_exception",

"reason": "java.util.stream.ReferencePipeline$3 cannot be cast to java.lang.Number"

}

}

},

"actions": [],

},

"messages": [

  "failed to execute watch transform"

],

you can always step away from using lambda and just go with plain for loops for better readability, what I did here (I think there's a brakcet mistake somewhere, but it is hard to spot).

How about

   "transform": {
      "script": {
        "lang": "painless",
        "source": """
        def clusters = ctx.payload.aggregations.clusters.buckets;
        def data = [];
        for (def i=0 ; i < clusters.length ; i++) {
          def cluster = clusters[i];
          def buckets = cluster.nodes.buckets.stream()
            .filter(b -> b.free_ratio.value < ctx.metadata.lower_bound)
            .map(it -> ['cluster_name':cluster.key,'node_name':it.key])
            .collect(Collectors.toList());
            data.addAll(buckets);
        }
        return data
        """
      }
    },
 

I got around it by doing something similar without having to loop through explicitly.

def hosts = []; ctx.payload.aggregations.clusters.buckets.stream().forEach(cluster -> cluster.nodes.buckets.stream().filter(node -> node.memory.value > ctx.metadata.jvm_usage_threshold).forEach(node -> hosts.add(['cluster':cluster.key,'node':node.key,'avg_jvm_usage':node.memory.value])));return ['hosts': hosts]

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.