Aggregating transaction data for payments

I want to know how many payments in my environment are of a given status, at a given time broken down by direction, and protocol.

In our case, a payment is a collection of transactions (docs) with the same paymentId. The document with the most recent timestamp is considered to be the 'current' status.

Here's some samples of the documents

{
  "status": "Pending",
  "paymentId": "a1",
  "@timestamp": "2021-09-08T14:10:01",
  "direction": "Inbound",
  "protocol": "SWF"
},
{
  "status": "Exception",
  "paymentId": "a1",
  "@timestamp": "2021-09-08T15:10:01",
  "direction": "Inbound",
  "protocol": "SWF"
},
{
  "status": "Completed",
  "paymentId": "a1",
  "@timestamp": "2021-09-08T16:10:01",
  "direction": "Inbound",
  "protocol": "SWF"
}

There might be more than 20,000+ payments of a specific type in a search window (as it is defined by business uses on demand).

Below is the aggregation I put together for this problem:

GET test-pmt/_search
{
  "size": 0, 
  "aggs": {
    "by_direction": {
      "terms": {
        "field": "direction.keyword",
        "size": 2
      },
      "aggs": {
        "by_protocol": {
          "terms": {
            "field": "protocol.keyword",
            "size": 3
          },
          "aggs": {
            "by_pmt": {
              "terms": {
                "field": "paymentId.keyword",
                "size": 20000
              },
              "aggs": {
                "by_recent_timestamp": {
                  "terms": {
                    "field": "@timestamp",
                    "size": 1,
                    "order": {
                      "_key": "desc"
                    }
                  },
                  "aggs": {
                    "by_status": {
                      "terms": {
                        "field": "status.keyword",
                        "size": 1
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

All of this comes down to a few questions:

  1. Is the 20000 bucket limit on a term aggregation for the entire payload, or just the local aggregation? (In other words, is the limit 20000, or is it parent buckets * child buckets * child sub-buckets?)
  2. Am I going about solving this problem the right way? This is a fairly common use-case, and while transforms will essentially dismiss some of the problems here, it doesn't allow for the variability of searching within different time windows.

One idea to make transform work: You could use a scripted_metric to create a status change log:

"status_history": {
  "2021-09-08T14:10:01": "Pending",
  "2021-09-08T15:10:01": "Exception",
  "2021-09-08T16:10:01": "Completed"
}

At query time you can use this recorded changelog to emit the status that falls into you wanted time range via a script as part of a terms aggregation or using a query-time runtime field (similar to this but with an additional upper bound).

The benefit of this: You can still reduce a lot of data in the transform.

Would having a timestamp as a key not lead to mapping explosion? Might it make more sense to have the status as field name and the timestamp as value?

good point, status_history could be mapped to a flattened field data type. There is no need to make this field aggregate-able.

I assumed that exceptions can occur multiple times, that's why swapping key and value would lead to overwrites, or you put it into an array.

I was able to put together a solution, based on your notes:

Below are the codeblocks:

transform:

"aggs": {
  "status_history": {
    "init_script": "state.status_history = new HashMap();",
    "map_script": """
        def c_date = doc['@timestamp'].getValue().toString();
        def c_status = doc['status.keyword'].getValue();
        state.status_history.put(c_date, c_status);
    """,
    "combine_script": "return state",
    "reduce_script": "return states[0].status_history"
  }
}

Runtime mapping script source:

def min_time = '2021-09-08T14:10:01';
def max_time = '2021-09-08T14:30:01';
def myHash = new HashMap(params['_source']['status_history']);

def max_key = '';
def max_value = 'empty';

for (entry in myHash.entrySet()) {
     def key = entry.getKey();
     def value = entry.getValue();
     int i = min_time.compareTo(key);
     int j = max_time.compareTo(key);
     if (i < 0 && j > 0 && key.compareTo(max_key) > 0) {
         max_key = key;
         max_value = value;
     }
}

emit(max_value);
2 Likes