Large aggregate (too_many_buckets_exception)

I have an index with millions of rows, most of the rows contain a hash value (md5)
I want to group by the hashed value and calculate the count of documents per hash and then sum the total count. This only for buckets with at least 2 documents.

I do this using Kibana and Elasticsearch (7.1). I got this working but for this particular set I have more then 800K of group by results (buckets) so Elasticsearch runs into a too_many_buckets_exception.

I know I can increase the max_bucket value but as far as I found out this is something you shouldn't do. Also in the future the 800K may easily become 2 MIL buckets or higher.

How can I get this metric witouth having to increase the max_bucket value? For me, used to SQL, this seems like a relatively easy question.

Hi Roel,

If you're used to putting everything on one machine then life certainly is easy. In a distributed system like most elasticsearch deployments, you have to deal with what we call the FAB conundrum which means you have to pick a trade-off.

The trade offs come from physical limits (speeds of networks, RAM limitations etc) and there are various options. Try run this wizard and see which option it leads you to and we can discuss further here.

1 Like

Hi Mark,

Thanks for your quick response and the great wizard. I seemed that I was already looking towards the right direction as I have read about partitioning and composite aggregation. However I can't figure out how do this in Kibana. Btw, this wizard pointed me to partitioning.

Composite will only ever give you buckets in the order of a key (but the key can be made from multiple values in a single doc).
If your required sort order is unrelated to the key - eg a different value like sum, average or count computed from multiple docs then life gets more complex because the related docs may be on many different machines and there may be many grouping keys to consider (too many to fit comfortably into a response's RAM). At this point your options become:

  1. Look at a common subset of the distributed keys in each request (term partitioning) or
  2. Put related data close to each other:
    a) Use "routing" to ensure all the same-keyed docs end up on the same machine
    b) Put all related info into the same doc ("entity centric indexing")

Can you say more about why you need to page through all of the results? As you can guess, it's not a straight-forward problem for any system.

I'm not entirely sure about what you mean, but I'll try to explain.

I have a set of documens with an md5 hash for each document. I want to calculate the total amount of documents which match another document. So I do a terms aggregation on the hash, count the documents per hash and sum on the count to get the total. But this results in the to many buckets exception.

So I guess to awnser your question to through all is this case is required to get a total..?

It might help to "zoom out" to the business problem you're trying to solve.

For example - if you were just trying to avoid duplicate docs maybe using the MD5 as the doc ID would be a straightforward way to avoid duplicates?

Good point. Well in this case it is actually to find and report on those duplicates. So hence the difficulty. But the partitioning seems like the awnser because it would mean to keep summing until I have the total number or am I wrong?

And would this be possible in Kibana?

Term partitioning is essentially a way to limit RAM when glueing results together from many machines to derive things like counts or averages etc for some keys.

If the set of keys to consider is too large for one single request then you need to find a way to break that set of keys into multiple partitions that can be considered in different requests.
You can think of it like processing just the MD5s that start with the letter "A", then running another request to do the ones that start with "B" and so on. Each result should have a sufficiently small subset of keys that you can gather and all the related data from machines and not run out of RAM when fusing a response. Rather than using the first letter of a key to partition into groups though we compute the hash of a key and modulo the number of required partitions to see if the key lands in the required partition.

Thanks again Mark makes sense, it is like a paging meganisme right? But is there anyway to make Kibana do this. To fetch the first X buckets, calculate the metrics and next X buckets calculate the buckets and so on and in the end sum the totals of the requests?

A little additional info:

I just tried answering the same question using the same set it Mongo. This also generates a very large set but I can enable allowDiskUsage to calculate the number in the end.

KInd of - within a partition (like the MD5's starting with "A") you get the required arbitrary sort order.
However, there's no global sort order to the results. Partitions aren't sorted. You just have N arbitrarily decided subdivisions of your data and within each of them they have a logically ordered subset of results.

As far as I know, no. If you fused the data at index time ie created entity-centric single docs, one for each unique MD5 with a count of related docs on them Kibana could work happily with those. The new "dataframes" feature in 7.2 might be able to help with that.

Interesting. Spilling intermediate results to disk is not a strategy we've reached for yet - the emphasis to date has been on doing things fast in the constraints of RAM.

As you have multiple documents having the same hash it could possibly be a lot faster and easier for you if you could reindex the data into a new index and use the hash as a routing key. This will make sure that all documents with a specific hash end up in the same shard, which means you can perform the aggregation accurately at the shard level.

You may try this in Kibana:
GET /{index_name}/_search
{
"query": {
"match_all": {}
},
"track_total_hits" : true,
"collapse": {
"field": "{hash_value_field_name}"
},
"aggregations": {
"unique_hash_value_list": {
"cardinality": {
"field": "{hash_value_field_name}"
}
}
}
}

The result of total/hits/value and aggregations/unique_hash_value_list/value are total of all and total of unique hash.

At this point, because there are not so many documents I'm running with the out of the box settings. Meaning (I thought) one shard. Even when changing this I think I run into the issue of the 10K limit of buckets. Or is it ok to change this if hardware resources are sufficiënt?

hi Yifeng,

Thanks for your response.

I just run this query but it doesn't give the expacted results. It gives the count of the unique hashes as where I need the sum of the count per hash.

Also I don't think there is a way to collapse using Kibana, but I might be wrong.

Hi Mark,

Thanks again. It would be nice if Kibana would support this some how. At this point I think data fusion is one of the best options.

In addition to this I do wonder why the query below gives an to many buckets error as it limited to 10000 which is equal to the max.

{
"aggs": {
"2": {
  "terms": {
    "field": "name",
    "order": {
        "_count": "desc"
      },
        "size": 10000,
        "min_doc_count": 2
   }
  }
 }
}

Check out shard_size
By default each shard is asked for more than size number of terms in order to improve accuracy

Hi Mark,

I just tried the data frame feature and this gives the expected results. I does make me wonder why this doesn't run in to the bucket exception. I guess it splits things up but that might be a possiblity as well when using a datatable visual you don't see all the data at once so you use the pages anyways.

It might be something to implement?

As you suggest, it does multiple requests using the ‘composite’ aggregation to group data under a key. Because it works through all keys in their natural sort order it does not have to worry about sorting by some value derived from multiple docs eg a cardinality count of some other field.