High cardinality aggregation questions

Hello,

I'm currently working on a use case with an extremely high cardinality (couple of million combinations) and I'm having trouble understanding how ES behaves with aggregations.
Here are sample documents:

{
            "docid" : 1
     "departurecity": "New York",
       "arrivalcity": "London",
        "passengers": 5
},
{
            "docid" : 2
     "departurecity": "New York",
       "arrivalcity": "London",
        "passengers": 8
},
{               
             "docid" : 3
      "departurecity": "Buenos Aires",
        "arrivalcity": "Mexico City",
         "passengers": 20
}

The goal is to get, out of all possible departure-arrival combinations, the TOP 100 most popular travels; hence by summing "passengers" in the aggregation.
I'm currently using a composite aggregation as follows:

GET passengers-2020*/_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "size": 65000,
        "sources": [
          {
            "departurecity": {
              "terms": {
                "field": "departurecity",
                "missing_bucket": false,
                "order": "asc"
              }
            }
          },
          {
            "arrivalcity": {
              "terms": {
                "field": "arrivalcity",
                "missing_bucket": false,
                "order": "asc"
              }
            }
          }
        ]
      },
      "aggregations": {
        "passengers": {
          "sum": {
            "field": "passengers"
          }
        },
        "sortCriteria": {
          "bucket_sort": {
            "sort": [
              {
                "passengers": {
                  "order": "desc"
                }
              }
            ],
            "from": 0,
            "size": 100,
            "gap_policy": "SKIP"
          }
        }
      }
    }
  }
}

I get some results from this aggregation.
Assume we now have the following documents instead:

{
            "docid" : 1
     "departure_arrival": "New York~London",
        "passengers": 5
},
{
            "docid" : 2
     "departure_arrival": "New York~London",
        "passengers": 8
},
{               
             "docid" : 3
      "departurecity": "Buenos Aires~Mexico City",
         "passengers": 20
}

What I did was combine departure and arrival into one single field and test the same composite aggregation but on a single field ("departure_arrival" instead of both "departurecity" and "arrivalcity") as follows:

GET passengers-2020*/_search
{
  "track_total_hits": false, 
  "size": 0,
    "aggs": {
        "my_buckets": {
            "composite": {
                "size": 65000,
                "sources": [
                    {
                        "departure_arrival": {
                            "terms": {
                                "field": "departure_arrival",
                                "missing_bucket": false,
                                "order": "asc"
                            }
                        }
                    }]
            },
            "aggs": {
                "passengers": {
                    "sum": {
                        "field": "passengers"
                    }
                },
                "sortCriteria": {
                    "bucket_sort": {
                        "sort": [
                            {
                                "passengers": {
                                    "order": "desc"
                                }
                            }
                        ],
                        "from": 0,
                        "size": 100,
                        "gap_policy": "SKIP"
                    }
                }
            }
        }
    }
}

This gave me a completely different result.
Then I also tested a terms aggregation, which also gave me completely different results:

GET passengers-2020*/_search
{
  "track_total_hits": true,
  "size": 0,
  "aggs": {
    "my_buckets": {
      "terms": {
        "field": "departure_arrival"
      },
      "aggs": {
        "passengers": {
          "sum": {
            "field": "passengers"
          }
        },
        "sortCriteria": {
          "bucket_sort": {
            "sort": [
              {
                "passengers": {
                  "order": "desc"
                }
              }
            ],
            "from": 0,
            "size": 100,
            "gap_policy": "SKIP"
          }
        }
      }
    }
  }
}

The questions:
By combining both fields "departurecity" and "arrivalcity" into "departure_arrival", I was hoping to reduce the query complexity and improve query response time performance. Not only did it take more time to compute, but it also gave me completely different results.

  • What is the best way for me to find the TRUE total number of passengers per departure-arrival ?
  • Which mapping would be best adapted (separate departure/arrival or combined departure~arrival) for best query performance?
  • Which aggregation can I use to get the top N results ?
  • Is it possible to get some intuition as to how would the composite aggregation work under the hood in this use case ?
  • How does the "size" parameter influence the composite and terms aggregation ?

Generally speaking accuracy and performance are improved by having better data locality.
In order of best to worst data locality :

  • All related data to a subject is held in a single doc
  • All related data to a subject is held in a single shard of an index held on a single machine
  • All related data to a subject is spread across multiple shards/nodes
  • All related data to a subject is spread across multiple indices/shards/nodes
  • All related data to a subject is spread across multiple clusters/indices/shards/nodes

If you have poor data locality then some of the analytics you attempt with aggregations will produce inaccurate results as it tries to produce results from snippets of data returned from multiple shards. We have some clever algorithms but we can't always defy physics so sometimes work with approximations and return error-bounds to you along with the results.

  • What is the best way for me to find the TRUE total number of passengers per departure-arrival ?

Roll up the data using the transforms api. It is the best data locality but may introduce a lag to the update latency.

  • Which mapping would be best adapted (separate departure/arrival or combined departure~arrival) for best query performance?

It looks like you're interested in routes so using departure+arrival as the subject key in the transforms api would be best. This "route' would be the entity in the entity centric index that the transforms API would build from your event-centric flight data.

  • Which aggregation can I use to get the top N results ?

Terms agg on an entity-centric routes index created using the transforms api would be fastest/most accurate

  • Is it possible to get some intuition as to how would the composite aggregation work under the hood in this use case ?

Composite agg is for paging through large lists of values in key order (not ordered by numbers of passengers or anything else)

  • How does the "size" parameter influence the composite and terms aggregation ?

It can increase accuracy in the terms agg when there's poor data locality - each shard is returning a bigger snippet of what it thinks might be useful to the final response.

See this wizard for a walk through the options

1 Like

Hello @galambo

I just noticed that @Mark_Harwood provided a quite exhaustive answer :sweat_smile:


I see in the first case you've used a composite aggregation.
Composite aggregations are the correct way to iterate over a terms aggregation.
We usually do not suggest to use size set to 65000.
It is usually better to paginate over results, but for a small index it should work.

In the last terms aggregation you've provided a terms without any size. Without specifying the size, it will default to the "top" 10 terms. This behavior is explained in our documentation.
I would expect that in the answer, in the aggregation response you'll have:

      "doc_count_error_upper_bound": N,   
      "sum_other_doc_count": M,   

Where at least M > 0. If it is the case, it means there are other terms to be explored and the size provided is not enough to cover them all.

If you want an exhaustive list, use Composite aggregation.

Even better, as Mark said, use the Transform Jobs to make this data entity centric, if you can accept to be not real-time. Transform Jobs use Composite aggregations behind the scenes.
The Transform Job can transform your original data and create one document per departure/destination pair with the metrics you want.

As a side note: aggregating over keyword fields with high cardinality will generally produce global ordinals. Keep an eye on the usage of fielddata in your cluster (documentation) using GET _cat/fielddata?v. There are efforts to avoid global ordinals to be generated when using composite aggregations (Avoid global ordinals in composite aggregation by ywelsch · Pull Request #74559 · elastic/elasticsearch · GitHub landed in 7.14.0, but got reverted in 7.15.1).

Hope this helps!

1 Like

@Mark_Harwood @Luca_Belluccini thank you tons for your clear and thorough explanations ! I will rethink my approach based on your advice :slight_smile:

Hello @Luca_Belluccini ! Thanks to @Mark_Harwood , I used a transform job to get the true number of "departure~arrival" combinations which is in the order of 1.5 million on my reduced dataset. (so there should be 1.5 million different terms in the aggregation)
What I forgot to mention is that I've used the terms aggregation above on a 1-shard index with size = 65000 and I do get "doc_count_error_upper_bound": N and "sum_other_doc_count": M in the response with N in the order of 100 and M in the order of tens of millions.

You mention :

  1. Does it mean that M is the count of the documents that are part of the "not returned terms"?
  2. Were these M documents taken into account for the terms aggregation or skipped through?

Now regarding N; what it represents is still unclear to me even after reading the documentation. I got N = 160 even though I queried an index with a single shard.

  1. Does it mean that one of the top terms is missing at most 160 documents ?
  2. After enabling "show_term_doc_count_error": true, shouldn't that number appear at the term level "doc_count_error_upper_bound" for the term in question ?
  1. Does it mean that M is the count of the documents that are part of the "not returned terms"?
  2. Were these M documents taken into account for the terms aggregation or skipped through?

M is the number of documents not considered in the aggregation.
N is the doc_count_error_upper_bound, which is described in our doc.

If you have high cardinality fields and you need ALL the terms, you need to use the composite aggregation and scroll over the results.

Those informations refer to the fact in a distributed system, where the data is sharded (if using multiple shards), when asking the top N elements there might be an error in the document counts.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.