High-Cardinality Aggregation Alternate Approaches (2 million buckets)

Requirement: calculate a sum aggregate on a high-cardinality field of 2M unique values, order the results by the sum, and return the top 10 (or pages of 10 each).

We are moving some of our materialized-view based data summarization queries from Postgres to Elasticsearch, and came across a need to aggregate a sum of on a high-cardinality field; basically think email addresses. We have around 2 million unique values for this field.

In our version 7.1 cluster, we're hit by the default limit of 10000 buckets based on the cluster setting for search.max_bucket. We could of course update this to something higher, but getting to 2 million would probably put us in danger of bringing down the cluster. There is the possibility of many concurrent queries too.

We cannot do a composite aggregation and paginate the buckets here because our requirement is to return the results ordered by sum - highest values first. So we have a bucket_sort pipeline aggregation alongside the composite as a means to do this, but hit a limit when the size value in the composite is over 10k.

I've seen some posts that generally say: "you can't do this in Elasticsearch. Instead pre-aggregate the data and dump it into a new entity-centric index".

Is that really the only approach when you come up against trying to post-process (e.g. sort) aggregated data that has greater than 10k (or some reasonable limit) of unique buckets?

Bump + further info:

So we headed down the path of an entity-centric index. Basically to summarize metrics on a per entity (e.g. user) basis. Even found the new (since 7.2) Transform API, which adds key missing features to the Reindex API and allows you to create/populate a target index as the aggregation of a source index.

BUT then ... we realized that our use case requires the flexibility of dynamically choosing dimensional filters on which the aggregation is to be based. And an entity-centric index effectively requires you to answer one question (or small related problem space of questions) that assume a fixed set of dimensional filters. For example you could do any ONE of the below, but not have flexibility to do any arbitrary combination of them on the fly.

Fixing time:

  • summarize users only in the last month

Fixing product category:

  • summarize users only buying from these product categories

Fixing location:

  • summarize users only from these locations

Fixing amount:

  • summarize transactions only above a certain dollar amount

Dynamically querying for these aggregated values works on a small scale and uses query-time filter values in the "query": { "bool": { "filter": {...} } } section, before the aggregation is applied, so we get potentially different count/sum aggregation values for each user entity, depending on the dimensional filter selection (since the set of documents being aggregated is different for each filter applied).

However the above works only up to a certain amount of buckets -- if we need to provide a sort order to those buckets -- which should reasonably not go above 10k or 50k.

To store aggregated values (e.g. purchase count, total expenditure, etc.) in an entity-centric index that can be queried fast later on, we'd need to establish that fixed dimensional filter ahead of time which totally undermines the ability to query the data from different dimensions.

This appears to be an intractable use case for Elasticsearch. A hard limit of what it can do and a question it can't answer in one go. A dead end. Please let me know if that is NOT the case and there are ways to accommodate this use case.

A hard limit of what it can do and a question it can't answer in one go. A dead end.

Distributed data makes this hard. But we have some different approaches. You might find this wizard helpful when choosing which is appropriate.

I think term partitioning might be useful in this case.

Ha, it seems you get these types of questions a lot -- that's a nice little javascript decision tree!

Answering the decision tree, it does indeed lead to trying partitioning.

So I tried a sample query like the below. Did I get that right?

Problem: The sorts within each partition of buckets is correct, but the global sort across all buckets is not (i.e. across subsequent partitions). Is this what you meant by "client-side stitching" (maybe you didn't say that, I thought I read that somewhere)? Is this just picking disjoint sets of 50 terms each, and then on a per-partition basis, asking the shards to provide a sum agg of each term in the given set, then ordering only among the 50?

i.e. will I have to iterate through all partitions, and then resort the entire high-cardinality set of buckets on the client side to get an accurate sort order of buckets?

Assume this is a subset of data only just under 3000 unique users in it. 60 partitions, at 50 buckets per partition = 3000.

GET my_data/_search
{
  "size": 0,
  "aggs": {
    "group_by_email": {
      "terms": {
        "field": "email_address",
        "size": 50,
        "order": { "sum_purchase_price": "desc" },
        "include": {
          "partition": 0,
          "num_partitions": 60
        }
      },
      "aggs": {
        "sum_purchase_price": {
          "sum": {
            "field": "purchase_price"
          }
        }
      }
    }
  }
}

If I have to run through all partitions to get all buckets in order to calculate a global sort and return the top-most buckets, how is this different from the below composite query, paging through all with after key, and then doing a sort on the client side?

Does partitioning support sending parallel requests for multiple partitions at once, where composite cannot (due to paging)? That is, will requests for the same partition always return the same buckets if the docs have not changed?

Still, either approach is going to be too slow for expected response times and ask the client software to manage and sort 2M datapoints in this case.

Similar composite

GET my_data/_search
{
  "size": 0, 
  "aggs": {
    "group_by_email": {
      "composite": {
        "size": 50,
        "sources": [
          {
            "email": {
              "terms": {
                "field": "email_address"
              }
            }
          }
        ]
      },
      "aggs": {
        "sum_purchase_price": {
          "sum": {
            "field": "purchase_price"
          }
        }
      }
    }
  }
}

You guessed it. :slight_smile: I always say aggregations are a simple request like the "carry a chicken, fox and grain" problem but logistics of carrying it out are made much more complex by a constraint - like a small boat (or in our case limited memory for ferrying data across the network).

Composite doesn't prune uninteresting results and returns all values sorted by a grouping key whereas with partitioning you can sort by some other order like a sum or max of a different field organised under a common grouping key. That way you can avoid dragging all values to the client.

Yes. it is deterministic.

With only 3k users that's manageable in one terms agg without partitioning, no? It would be below the default 10k buckets-per-agg-request limit? Only having 50 terms in a partition is needlessly low.

Oh yes, absolutely. I was just testing the sort behavior of this query, and didn't want to use a full dataset. So my query/test was just on a small subset, to see if sort was global across all partitions (which it doesn't seem to be).

The real query would need to look like:

GET my_data/_search
{
  "size": 0,
  "aggs": {
    "group_by_email": {
      "terms": {
        "field": "email_address",
        "size": 1000,
        "order": { "sum_purchase_price": "desc" },
        "include": {
          "partition": 0,
          "num_partitions": 2000
        }
      },
      "aggs": {
        "sum_purchase_price": {
          "sum": {
            "field": "purchase_price"
          }
        }
      }
    }
  }
}

... To traverse all 2,000,000 buckets to know I definitely got the top 10 e.g. buckets out of 2M.

So please confirm this is how this would play out in this case:

In order to do a sum-agg on each bucket, and then pull out only the top 10, I'd need to:

  1. consider the default 10000 limit for search.max_buckets
  2. pick a reasonable bucket size that is under that for each request: 1000 here
  3. do the cardinality agg to count unique buckets, and divide by 1000 to get number of partitions. Assume just under 2M unique yields about 2000 partitions
  4. send 2000 requests to ES to gather all the bucket sums
  5. run a client-side sort on those 2000 sum values
  6. pick out the top 10
  • ... then they want to see the next 10
  1. So either keep all this data stored on the client-side
  2. Or repeat the whole lengthy process again for the next 10

Is that how it would need to play out? I could see this maybe being okay for backoffice analytics... but not in an online dashboard or site that people could use to gather quick info.

Deep, accurate, fast pagination is not practical for data distributed in this way.
We can't beat physics so you'd need to bring related data together - this could be either:

  1. Same-document (using transforms to create various entity-centric indices) or
  2. Same-shard (use the email address as a routing key to ensure all data for an email address ends up on the same shard or just store all data in one shard).

There's pros and cons to each of these. Option 2 means you can rely on accurate and fast aggregation but can't paginate deeply in sum order. Option 1 has all the data pre-summed so you can just sort by that field and paginate deeply

That last part. I'm not sure if I understood this correctly. My tests seem to indicate that getting a sort across all 2M buckets would require dragging all values to the client. Fetching the first partition of 50 buckets did sort, but only among those 50 buckets. The next partition of 50 may yield a bucket with a greater sum than the greatest in the first partition.

Can you clarify a bit more how the routing key helps? I've come across a few docs that suggested terms on its own (without partitions and without composite) may yield inaccurate sums/counts if size is too small. But in the cases above, the sum aggs should sum all documents for each term, across all shards, correct? Would the routing key simply speed up the agg latency, since partial sums from each shard wouldn't have to be brought back to the coordinating node to complete the term sum?

Here’s the way to think of it. In the default scenario of randomly distributed docs and a request for the top N terms each shard will return more than N terms of what they individually consider to be the best answers. These are summed and then trimmed on the coordinating node for return to the client. The problem is one or more of the shards may not have returned some stats for a term that was low-scoring on that shard but was ultimately returned because it was high scoring on another shard.
This can be solved (up to a point) by getting each shard to return significantly more than N results in the hope that this helps fill in some of the missing data for each term. Selecting terms on most-popular or highest spend is typically accurate but long-tail items like least popularity or lowest spend can be wildly inaccurate.

By using custom data routing each shard is guaranteed to have complete visibility of all stats for terms held on that shard. Accuracy is guaranteed.

1 Like

What I takeaway from your explanation and a re-read of the relevant doc warnings is that a "top N" terms query is dangerous due to potentially missing/underrepresented term aggs sent back by the shards, leading to inaccurate final sums/counts/aggs by the coordinator, which in turn could lead to incorrect sorting to choose the top N.

I understand inaccuracies due to approximation to be the case for A) below, but NOT for B) or C). Can you confirm?

  • A) A terms agg where shard_size < term cardinality, and no routing key for the term (... shard_size defaluts to floor(1.5*size)+10)
  • B) A composite agg that uses a terms source
  • C) A terms agg that uses include.partition, include.num_partitions

Some sample tests across a small 3-node, 3-shard cluster seemed to show B) and C) did not suffer from the same inaccuracy problem with sum that A) did.

Sorting to get "top N" with B) or C) is a different challenge. To get an accurate "top N" buckets, it seems I'd need to pull all terms' buckets to the client side and compare them to get a global sort. To do those:

  • B) sequentially paginate all buckets one page at a time using the after key, pulling every one back to the client-side, and doing a global sort of the buckets' sums to get "top N"
  • C) requesting every partition from the cluster, each being a disjoint set of terms by way of a hashing the term against the partition numbers, accumulating those on the client-side, and doing a global sort of the buckets' sums to get "top N"

So to solve this use case with B) (composite) or C) (partitions), I'll need to siphon all buckets to the client first to do the final sorting pass, yes?

Now back to A). If documents with the same term are all sent to the same shard by way of a routing key, it does seem to solve the potentially inaccurate sums. Does a term-based routing key at index time also allow for "top N" queries to be answered without looking at all the buckets (server-side or client-side)?

Seems like it would because every shard could confidently send its top N candidates forward to the coordinator and let it sort out the final N. Am I missing some subtle corner case where a top N candidate wouldn't make it to the coordinator?

I'd just be left with a deep-pagination problem. e.g. if someone wanted page 100,000, to look at items 1,000,001-1,000,010 ... I'd need the server to process a terms agg with "size": 1000010 (ouch). Or fall back to B) or C) to make the client suffer instead of the server. Maybe a reasonable compromise here is to limit deep-pagination, and have that pagination stop at the 1000th page (10,000th item for pages of 10) to keep the 10k search.max_buckets limit.

Sorry to ramble through this. Any inaccuracies in these assessments?

Good write up of the options.
The thing to consider with B vs C are the different distributed counting approaches.

When there are too many unique terms the B and C strategies are used to try get each shard to consider the same subset of the overall term space.
With B (composite) each shard is focusing in on gathering stats for the subset of terms that occur immediately after the ‘after’ value provided when they are sorted by the term value. This works because each shard can independently agree that the term value X comes before Y and have the same ranking logic for term selection. What they can’t do is sort by a sum eg all of X’s sales so the poor client has to page through every term alphabetically.
Option C (partitioning) makes each shard consider the same subset of terms because they can all independently agree that hash of term X, modulo N gives a partition number of 3 and that might be the partition we are considering. This is just like option A but we found a way to cut the index into arbitrarily small subdivisions and can afford to do global sorts by sums of another field knowing that we are more likely to be accurate. The sorting by sum means you may be able to avoid returning a large percentage of the terms to the client for each partition.
Bear in mind option A with a suitable shard_size frequently works ok - you just need to pay attention if there are non-zero error margin values returned in results.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.