The Situation
We have the use case where we are storing time series data with non-uniform sampling in Elasticsearch. Each document has certain fields, for which which need to aggregate data to find summary metrics. However, since the sampling is not uniform, we currently store a duration column, so that we can aggregate by the sum of the duration, find averages etc.
POST test_index/test_type
{
"timestamp": "2015-03-25T17:49:00.000Z",
"state": "State_1",
"duration": 50
}
POST test_index/test_type
{
"timestamp": "2015-03-25T17:49:00.050Z",
"state": "State_2",
"duration": 100
}
POST test_index/test_type
{
"timestamp": "2015-03-25T17:49:00.150Z",
"state": "State_2",
"duration": 1000
}
POST test_index/test_type
{
"timestamp": "2015-03-25T17:49:01.150Z",
"state": "State_1",
"duration": 2000
}
However, this gets complicated in some cases when we want to add a new document in the middle of the time series data. In such a case, we have to insert the new record, change the durations of the previous records, and calculate the duration of the current record with respect to the next one and then change the records appropriately. Since we expect a lot of these modifications, this is not easy.
However, since there is a single timeline, and there is only one observation at a timestamp, and the duration is always (next timestamp - the current timestamp), we were wondering if it was possible to create some sort of a virtual field or aggregation to solve the problem
Possible Solutions
We tried using a combination of single document bucket aggregations with serial differencing to solve the issue. For example,
GET test_index/_search?size=0&timeout=10s&filter_path=aggregations.timestamps.buckets.duration,aggregations.timestamps.buckets.timestamp,took,aggregations.sum_durations
{
"aggs": {
"timestamps": {
"histogram": {
"field": "timestamp",
"interval": 1,
"min_doc_count": 1
},
"aggs": {
"timestamp": {
"max": {
"field": "timestamp"
}
},
"duration": {
"serial_diff": {
"buckets_path": "timestamp",
"lag": 1
}
}
}
},
"sum_durations": {
"sum_bucket": {
"buckets_path": "timestamps>duration"
}
}
}
}
which gives me the output
{
"took": 5,
"aggregations": {
"timestamps": {
"buckets": [
{
"timestamp": {
"value": 1427305740000,
"value_as_string": "2015-03-25T17:49:00.000Z"
}
},
{
"timestamp": {
"value": 1427305740050,
"value_as_string": "2015-03-25T17:49:00.050Z"
},
"duration": {
"value": 50
}
},
{
"timestamp": {
"value": 1427305740150,
"value_as_string": "2015-03-25T17:49:00.150Z"
},
"duration": {
"value": 100
}
},
{
"timestamp": {
"value": 1427305741150,
"value_as_string": "2015-03-25T17:49:01.150Z"
},
"duration": {
"value": 1000
}
}
]
},
"sum_durations": {
"value": 1150
}
}
}
Problems with the approach
Although this approach allows us to get durations on the fly, it has a few deal-breaker issues for us
- The duration value is assigned to the next data item, instead of the previous data item. So the duration for which a data item is active can be derived only from the next data item . However, out data is stateful with the current data explaining what happens from now onwards instead of what happened till now. There is no way to give a negative lag in elastic search.
- Finding the duration for non continuous timelines (eg:- total duration of cycles in "State_1") is not straightfoward. We could possibly
- First do the implicity duration for all of time as before
- Do a filter on the aggregated data
- Do a aggregation on the filtered aggregations
This seems needlessly complex, and we haven't been able to get it to work till now. Is there another approach ? I can provide more details if you want to know the context.