Aggregating the sum of sub-aggregation

I've been trying to put together an aggregation on what I would call an "intermediate bucket" (aggregating the total of a bucket).

The data is coming from packetbeat, where I'm trying to aggregate the total number bytes sent by source between a host/process.

The data looks a little like follows (not really sure if helpful shown this way).
source.hostname|source.process.name|destination.hostname|destination.port|flow.id|source.bytes|destination.bytes
host_a|scp|host_b|1440|flow_1|500|120
host_a|scp|host_b|1440|flow_2|1020|180
host_a|firefox|host_b|443|flow_3|1580|170

As a new flow ID is created when a connection is closed and another opened to the same host/port, I want to aggregate the sum of the source/destination bytes on the path:
source.hostname>source.process.name>destination.hostname>destination.port

I want to take the latest document for each flow.id and sum up based on the other fields, so that the result would look like the following:
host_a|scp|host_b|1440|1520|300
host_a|firefox|host_b|443|1580|170

I've managed to get the latest value of each flow.id by doing a bucket max on the @timestamp. However, I'm struggling to aggregate the sum of these together based on the previous buckets.

Could someone point me in the right direction of how I would aggregate in this way?

Below is the JSON for the aggregations from my Kibana visualisation:

     "aggs": {
    "2": {
      "terms": {
        "field": "source.hostname",
        "order": {
          "_key": "desc"
        },
        "size": 1000
      },
      "aggs": {
        "3": {
          "terms": {
            "field": "source.process.name",
            "order": {
              "_key": "asc"
            },
            "size": 1000
          },
          "aggs": {
            "4": {
              "terms": {
                "field": "destination.hostname",
                "order": {
                  "_key": "asc"
                },
                "size": 1000
              },
              "aggs": {
                "5": {
                  "terms": {
                    "field": "destination.port",
                    "order": {
                      "_key": "asc"
                    },
                    "size": 1000
                  },
                  "aggs": {
                    "9": {
                      "terms": {
                        "field": "flow.id",
                        "order": {
                          "_key": "desc"
                        },
                        "size": 1000
                      },
                      "aggs": {
                        "7": {
                          "max_bucket": {
                            "buckets_path": "7-bucket>7-metric"
                          }
                        },
                        "8": {
                          "max_bucket": {
                            "buckets_path": "8-bucket>8-metric"
                          }
                        },
                        "11": {
                          "sum_bucket": {
                            "buckets_path": "11-bucket>11-metric"
                          }
                        },
                        "11-bucket": {
                          "terms": {
                            "field": "@timestamp",
                            "order": {
                              "_key": "desc"
                            },
                            "size": 1
                          },
                          "aggs": {
                            "11-metric": {
                              "sum": {
                                "field": "destination.bytes"
                              }
                            }
                          }
                        },
                        "7-bucket": {
                          "terms": {
                            "field": "@timestamp",
                            "order": {
                              "_key": "desc"
                            },
                            "size": 1
                          },
                          "aggs": {
                            "7-metric": {
                              "sum": {
                                "field": "source.bytes"
                              }
                            }
                          }
                        },
                        "8-bucket": {
                          "terms": {
                            "field": "@timestamp",
                            "order": {
                              "_key": "desc"
                            },
                            "size": 1
                          },
                          "aggs": {
                            "8-metric": {
                              "sum": {
                                "field": "destination.bytes"
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
      }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.