Elasticsearch Apply filters with results from aggregations

Is there anyway I can using the results obtained from aggregations to filter out the final hits in the query?

I want to obtain a list of users who have more than 2 devices and the list of their devices in the database. The device count can be done using aggregations, however I'm having a hard time trying to figure out how to use that results to apply on the final hits.

I thought about using post_filter but it didn't seem to work.

Here my code

{
"query": {
    "bool": {
            "filter": dt_filter, 
            "must": {"term": {"username": "cakenoodle"}}
        }
    }, 
    "aggs": {
        "user_count": {
            "aggs": {
                "nb_device": {"cardinality": {"field": "device_uuid"}},
                "nb_ip_addr": {"cardinality": {"field": "ip.address"}}, 
                "sus_count": {
                    "bucket_selector": {
                        "buckets_path": {
                            "nb_device": "nb_device",
                            "nb_ip_addr": "nb_ip_addr",
                        },
                        "script": "params.nb_device >= 2 && params.nb_ip_addr >= 2"
                    }
                }
            },
            "terms": {
                "field": "username"
            }
        },
    }, 
    "post_filter": {"range": {"nb_device": {"gte": 2}}} // didn't work here
}

The equivalence of SQL would be something like this:

WITH 
device_count AS (
    SELECT 
        user, 
        COUNT(device_id) nb_device
    FROM table
    GROUP BY user
    HAVING COUNT(device_id) >= 2
)

SELECT 
    table.user, 
    table.device
FROM table
    JOIN device_count ON device_count.user = table.user

To obtain list of devices, add terms aggregation on devices as sub-aggregation for username terms aggregation.

Something like this:

GET kibana_sample_data_flights/_search
{
  "size":0,
  "aggs":{
    "city":{
      "terms":{
        "field": "DestCityName",
        "size": 10000
      },
      "aggs":{
        "airports":{
          "terms":{
            "field": "DestAirportID"
          }
        },
        "nb_airport": {
          "cardinality": {
            "field": "DestAirportID"
          }
        },
        "nb_airport_filter":{
          "bucket_selector":{
            "buckets_path":{
              "nb_airport": "nb_airport"
            },
            "script": "params.nb_airport >= 2"
          }
        },
        "sort":{
          "bucket_sort": {
            "sort": [
              {"nb_airport":{"order":"desc"}}
              ]
          }
        }
      }
    }
  }
}

You will get:

{
  "aggregations" : {
    "city" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "London",
          "doc_count" : 329,
          "nb_airport" : {
            "value" : 3
          },
          "airports" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : "LTN",
                "doc_count" : 130
              },
              {
                "key" : "LGW",
                "doc_count" : 111
              },
              {
                "key" : "LHR",
                "doc_count" : 88
              }
            ]
          }
        },
        {
          "key" : "Rome",
          "doc_count" : 191,
          "nb_airport" : {
            "value" : 3
          },
          "airports" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : "FCO",
                "doc_count" : 89
              },
              {
                "key" : "RM11",
                "doc_count" : 70
              },
              {
                "key" : "RM12",
                "doc_count" : 32
              }
            ]
          }
        },......

The result contains DestAirportID for each DestCityName matching the condition of bucket_selector.

1 Like

Thanks for your support. I just have 2 more following questions regarding this:

  1. When I set the size = 10000, an error TransportError: TransportError(503, 'search_phase_execution_exception') was raised. The maximum size I could use was 2000. How can I increase the size?
  2. Is there a way to know how many aggregated results will be returned? How can I paginate if I have more than 10000 (or 2000 in my case) in size?

Searching Google hasn't given me any satisfying answers yet...

See Size section in the terms aggregation doc. The default value of search.max_buckets is 65,536. Take care that someone set the value to 2,000 with some intention.

As described in the doc, you can use composite aggregation to pagenate on the aggregation result.

Actually, it also happened with the same error message when I set the size to 1000. Not sure what caused the error since it seemed to be okay before.

I need full query and error message to consider about the reason.

Here my code:

    "aggregations": {
        "unique_users": {
            "terms": {
                "field": "username", 
                "size": 500
            },

            "aggs": {
                "device_uuid": {"terms": {"field": "device_uuid"}},
                "ip_address": {"terms": {"field": "ip.address"}}, 
                
                "nb_device": {"cardinality": {"field": "device_uuid"}},
                "nb_ip_add": {"cardinality": {"field": "ip.address"}},
                "nb_device_filter": {
                    "bucket_selector": {
                        "buckets_path": {
                            "nb_device": "nb_device",
                            "nb_ip_add": "nb_ip_add"

                        }, 
                        "script": "params.nb_device >= 2 && params.nb_ip_add >= 2"
                    }
                }
            }
        }
    }

This is the error message: TransportError: TransportError(503, 'search_phase_execution_exception')

I experimented using different size, now none seems to work.

Is that the entire error message?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.