Cumulative sum of derivatives

cumulative sum of derivatives

I am currently using elastic for storing time series of of data where I am storing the incoming packets( it is incremental counter) on a port->network device .

I am doing aggregations to calculate number of packets received during a time interval(histogram) and also I want to calculate the total number of packets received on the queried interval .

Number of packets received during time interval is derivative of maximum running counter .
Total number of packets received on the queried interval is cumulative sum of derivatives .

{
  "query" : {
    "bool" : {
      "must" : {
        "range" : {
          "exportTimeStamp" : {
            "from" : 1523826005514,
            "to" : 1523862005514,
            "include_lower" : true,
            "include_upper" : true
          }
        }
      }
    }
  },
  "aggregations" : {
    "portName" : {
      "terms" : {
        "field" : "portName",
        "size" : 0
      },
      "aggregations" : {
        "dataPoints" : {
          "date_histogram" : {
            "field" : "exportTimeStamp",
            "interval" : "5m",
            "min_doc_count" : 0
          },
          "aggregations" : {
            "Max" : {
              "max" : {
                "field" : "ingressPackets"
              }
            },
            "Der" : {
              "derivative" : {
                "buckets_path" : [ "Max" ],
                "gap_policy" : "insert_zeros"
              }
            },
            "CumSum" : {
              "cumulative_sum" : {
                "buckets_path": "Der"
              }
            }
          }
        }
      }
    }
  }
}

I am getting following error since for the first data point aggregation since there will no derivative populated

{
    "error": {
        "root_cause": [],
        "type": "reduce_search_phase_exception",
        "reason": "[reduce] ",
        "phase": "fetch",
        "grouped": true,
        "failed_shards": [],
        "caused_by": {
            "type": "null_pointer_exception",
            "reason": null
        }
    },
    "status": 503
}

Result with out "CumSum" aggregation snippet .

"buckets": [
                            {
                                "key_as_string": "2018-04-16T06:20:00.000Z",
                                "key": 1523859600000,
                                "doc_count": 1,
                                "Max": {
                                    "value": 58
                                }
//I think the problem is here where we dont have derivative and hence when we add cumulative sum aggregation we are getting NPE.
                            },
                            {
                                "key_as_string": "2018-04-16T06:25:00.000Z",
                                "key": 1523859900000,
                                "doc_count": 3,
                                "Max": {
                                    "value": 169
                                },
                                "Der": {
                                    "value": 111
                                }.....

Please clarify how to solve NPE in this case ?

I think you're right... that first bucket is probably breaking the cusum. I think this is a similar issue to https://github.com/elastic/elasticsearch/issues/27544, except with a null value instead of empty bucket.

Could you try running this version with a script instead to see if it fixes?

{
  "query" : {
    "bool" : {
      "must" : {
        "range" : {
          "exportTimeStamp" : {
            "from" : 1523826005514,
            "to" : 1523862005514,
            "include_lower" : true,
            "include_upper" : true
          }
        }
      }
    }
  },
  "aggregations" : {
    "portName" : {
      "terms" : {
        "field" : "portName",
        "size" : 0
      },
      "aggregations" : {
        "dataPoints" : {
          "date_histogram" : {
            "field" : "exportTimeStamp",
            "interval" : "5m",
            "min_doc_count" : 0
          },
          "aggregations" : {
            "Max" : {
              "max" : {
                "field" : "ingressPackets"
              }
            },
            "Der" : {
              "derivative" : {
                "buckets_path" : [ "Max" ],
                "gap_policy" : "insert_zeros"
              }
            },
            "cusum_script": {
              "bucket_script": {
                "buckets_path": {
                  "deriv": "Der"
                },
                "script": {
                  "source": "if (params.deriv != null) {params.accumulator.value += 1;} return params.accumulator.value;",
                  "params": {
                    "accumulator": {
                      "value": 0
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

Basically, it uses a bucket_script to emulate the cusum agg, but has a check for null values first. I think I see the bug in the code, so if this fixes your issue I'll work on a bugfix for the cusum agg itself.

I opened a PR to fix this particular bug: https://github.com/elastic/elasticsearch/pull/29641

Hi Zachary Tong,

Thanks for checking on this .

I have tried your suggestion .

   {
"query" : {
    "bool" : {
      "must" : {
        "range" : {
          "exportTimeStamp" : {
            "from" : 1524359580418,
            "to" : 1524467580418,
            "include_lower" : true,
            "include_upper" : true
          }
        }
      }
    }  
  },
  "aggregations" : {
    "applicationName" : {
      "terms" : {
        "field" : "portName",
        "size" : 0
      },
      "aggregations" : {
        "dataPoints" : {
          "date_histogram" : {
            "field" : "exportTimeStamp",
            "interval" : "5m",
            "min_doc_count" : 0
          },
          "aggregations" : {
            "Max" : {
              "max" : {
                "field" : "ingressPackets",
                "missing" : 0
              }
            },
            "Der" : {
              "derivative" : {
                "buckets_path" : [ "Max" ],
                "gap_policy" : "skip"
              }
            },
             "cusum_script": {
              "bucket_script": {
                "buckets_path": {
                  "deriv": "Der"
                },
                "script":  "if (params.deriv != null) {params.accumulator.value += 1;} return params.accumulator.value;",
                "params": {
                    "accumulator": {
                      "value": 0
                    }
                  }
              }
            }
          }
        }
      }
    }
  }
}

I am getting following error .
{
"error": {
"root_cause": [
{
"type": "search_parse_exception",
"reason": "Unknown key for a START_OBJECT in [cusum_script]: [params].",
"line": 47,
"col": 121
}
],
"type": "search_phase_execution_exception",

Please suggest.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.