Time Series + Aggregation + Netflow

We are looking to generate time series data for Netflow records stored in ES and we’re facing some challenges in determining the appropriate way to perform necessary aggregations.

In our case, each Netflow record is stored as a separate document, using a schema that is similar to the one used by the Logstash Netflow Module. (The fields in question here are actually identical.)

curl -XPUT 'http://localhost:9200/_template/netflow' -H 'Content-Type: application/json' -d'
{
    "order": 0,
    "template": "netflow-*",
    "mappings": {
        "_default_": {
            "_meta": {
                "version": "1.0.0"
            },
            "properties": {
                "@timestamp": {
                    "type": "date",
                    "format": "epoch_millis"
                },
                "netflow": {
                    "dynamic": true,
                    "type": "object",
                    "properties": {
                        "bytes": {
                            "type": "long"
                        },
                       "first_switched": {
                            "type": "date"
                        },
                        "last_switched": {
                            "type": "date"
                        }
                    }
                }
            }
         }
    },
    "aliases": { }
}
'

We are interested in generating time series for the number of bytes transferred on some filtered subset the flow documents (i.e. on some host, on some interface, etc..).

For example, if we have a single flow record with:

curl -XPOST 'http://localhost:9200/netflow-1970/flow' -H 'Content-Type: application/json' -d'
{
    "timestamp": 460,
    "netflow.first_switched": 100,
    "netflow.last_switched": 450,
    "netflow.bytes": 350
}
'

We’d like to be able to generate a time series with start=0, end=500, step=100, and have the following data points:

t=0, bytes=0
t=100, bytes=100
t=200, bytes=100
t=300, bytes=100
t=400, bytes=50
t=500, bytes=0

In this case, each step (or bucket) would contain a fraction of the bytes, relative to how much of the flow falls into that step. We assume that the flow bytes are evenly spread across the range and if were multiple flow records in a single step we would sum of the corresponding bytes.

The problem is that we have documents which correspond to some time range/interval, and we’d like to:

  1. Group these into buckets based on whether or not the bucket is in the interval (each document may fall into multiple buckets)
  2. Perform some metric based aggregation that uses the ratio of time for which the document’s “interval” falls into that bucket as part of the computation (a document may fall completely, or only partially in a given bucket)

Looking at the existing facilities, the “Date Histogram Aggregation” gets us pretty close to the bucketing behavior, but it doesn’t have any concept of interval, or ranges. If the specified field has multiple values, it is possible for the document to fall into multiple buckets, but this would require us to include all of the dates within the internal with an accuracy of the smallest step for which we would like to query.

Now assuming the buckets were properly created, the metric aggregation would need to the value of the ratio or access to the bucket key, bucket key interval, range start, and range end values in order to derive it.

Does anyone have any ideas on how we could perform such aggregations using ES or has anyone come across a similar use case before?

Doing these aggregations client side isn’t an option for us because of the the sheer volume of data we would need to fetch.

Since we haven't been able to come up with a solution using the existing aggregations, we've started developing a plugin for this particular use case. It needs some cleanup, but it's currently able to generate the series as described above: https://github.com/OpenNMS/elasticsearch-drift-plugin

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.