I've been trying to put together an aggregation on what I would call an "intermediate bucket" (aggregating the total of a bucket).
The data is coming from packetbeat, where I'm trying to aggregate the total number bytes sent by source between a host/process.
The data looks a little like follows (not really sure if helpful shown this way).
source.hostname|source.process.name|destination.hostname|destination.port|flow.id|source.bytes|destination.bytes
host_a|scp|host_b|1440|flow_1|500|120
host_a|scp|host_b|1440|flow_2|1020|180
host_a|firefox|host_b|443|flow_3|1580|170
As a new flow ID is created when a connection is closed and another opened to the same host/port, I want to aggregate the sum of the source/destination bytes on the path:
source.hostname>source.process.name>destination.hostname>destination.port
I want to take the latest document for each flow.id and sum up based on the other fields, so that the result would look like the following:
host_a|scp|host_b|1440|1520|300
host_a|firefox|host_b|443|1580|170
I've managed to get the latest value of each flow.id by doing a bucket max on the @timestamp. However, I'm struggling to aggregate the sum of these together based on the previous buckets.
Could someone point me in the right direction of how I would aggregate in this way?
Below is the JSON for the aggregations from my Kibana visualisation:
"aggs": {
"2": {
"terms": {
"field": "source.hostname",
"order": {
"_key": "desc"
},
"size": 1000
},
"aggs": {
"3": {
"terms": {
"field": "source.process.name",
"order": {
"_key": "asc"
},
"size": 1000
},
"aggs": {
"4": {
"terms": {
"field": "destination.hostname",
"order": {
"_key": "asc"
},
"size": 1000
},
"aggs": {
"5": {
"terms": {
"field": "destination.port",
"order": {
"_key": "asc"
},
"size": 1000
},
"aggs": {
"9": {
"terms": {
"field": "flow.id",
"order": {
"_key": "desc"
},
"size": 1000
},
"aggs": {
"7": {
"max_bucket": {
"buckets_path": "7-bucket>7-metric"
}
},
"8": {
"max_bucket": {
"buckets_path": "8-bucket>8-metric"
}
},
"11": {
"sum_bucket": {
"buckets_path": "11-bucket>11-metric"
}
},
"11-bucket": {
"terms": {
"field": "@timestamp",
"order": {
"_key": "desc"
},
"size": 1
},
"aggs": {
"11-metric": {
"sum": {
"field": "destination.bytes"
}
}
}
},
"7-bucket": {
"terms": {
"field": "@timestamp",
"order": {
"_key": "desc"
},
"size": 1
},
"aggs": {
"7-metric": {
"sum": {
"field": "source.bytes"
}
}
}
},
"8-bucket": {
"terms": {
"field": "@timestamp",
"order": {
"_key": "desc"
},
"size": 1
},
"aggs": {
"8-metric": {
"sum": {
"field": "destination.bytes"
}
}
}
}
}
}
}
}
}
}
}
}
}
}
}