Impact of client nodes on aggregations/Optimization of heavy aggregations

We have a massive, multi level aggregation that consumes a lot of memory and CPU in our ES cluster. The cluster consists of 3 master nodes and 4 data nodes. The heap size of each node is 8G, with 40% allocated to the field data cache. We have monthly indices with 6 shards each, and each index contains about 100.000.000 documents. The aggregation, which is 5 levels deep and consists of a mixture of terms, date_histogram, percentile and stats. As mentioned, running the aggregation consumes a lot of memory and CPU and impacts other index/search operations while it is running, which usually takes about a minute. With the current number of terms, one request consumes about 30-40% of the total heap.

First question:

  • Will adding "client" nodes (master: false, data: false), and routing aggregation requests through these nodes off load some of the processing of the data nodes, so indexing and other search requests can perform as normal on the data nodes?

Also, the number of documents and top level terms will continue to grow, so some time in the future, the number of top-level buckets will be a lot higher than today, which leads to the next questions:

  • Will ElasticSearch be able to handle an aggregation this complex at all, when the number of documents/terms increases?
  • What can we do to optimize the memory usage of multi level aggregations?



Nils-Helge Garli Hegvik

As far as aggregations are concerned, client nodes would only help for the so-called reduce phase, whose cost depends on the number of shards that are queried, and the size of your aggregations. Can you share your aggregation (anonymized if necessary), as well as information about the cardinality of all the fields that are involved in this aggregation?

More than one level of a terms aggregation on a high-cardinality field is probably something that elasticsearch will always struggle at, due to the way it is designed. However there are often ways to work around the issue by modeling data differently (eg. by merging two fields into one).

The documents are events belonging to a source. Something along the line of


  • source_id (integer-value, about 5000 different values, ranging between 100000 - 2000000)
  • event_time (date time)
  • property_a (integer value between 1-8)
  • property_b (double value)
  • property_c (double value)

The aggregation looks like this, inside a filtered query for the terms of source_id that should be included (currently between 5-20 terms, but could be several houndred in the future):

(I have tried to illustrate the nesting level with list indentation)

  • terms (source_id)
  • date_histogram (event_time/1h interval over a month)
    • terms (property_a)
      • missing (property_c)
      • stats (property_c)
      • percentile (property_c, 85, 95)
      • range (property_c, 20 groups)
      • range (property_b, 5 groups)
        • stats (property_c)

As mentioned, all this resides in a monthly index, with 6 shards.

How many terms (the value of the size parameter) does the top-level agg on source_id return? If the size value is small, you might want to give a try to the breadth_first collect_mode to see how it behaves.

Otherwise, I don't see much to be improved. When you say it takes one minute, is it running on all your 100M documents, or only a subset? If it runs on the same query all the time, you might want to have a look at the shard query cache ( and potentially increase the refresh interval so that cache entries live longer.

We do these aggregations within a filtered query, so the aggregations run for all the documents that matches the query. With the current dataset, that means about ~ 20 buckets for the top level terms aggregation and 20-30 million docs. Basically, we only use the buckets for grouping.

OK, thanks for the additional information. Then I don't think there is much that can be improved, this aggregation just happens to create many buckets because of the nesting. One way that you might be able to save memory would be to split thie request: eg. first run the top-level terms aggregation, and then for each individual term, add a filter to your filter_query for that term, and run the date_histogram and its sub aggs. This way, each aggregation should use about 1/20th of the memory of your current aggregation.