ElasticSearch pick one product from each category with top score

I am trying to query ES index which contains product information, having product_id, category_id and variant_id fields. Each product belong to certain category and variant:

{
    "product_id" : "PRODUCT_12345",
    "category_id" : 1,
    "variant_id" : 5
}

I also have list of product_id and its scores:

[{'product_id': 'PRODUCT_46831', 'score': 1}, {'product_id': 'PRODUCT_47139', 'score': 0.95}, {'product_id': 'PRODUCT_46833', 'score': 0.8999999999999999}, {'product_id': 'PRODUCT_46834', 'score': 0.8499999999999999}, {'product_id': 'PRODUCT_46835', 'score': 0.7999999999999998}]

These scores are calculated using an algorithm and each product_id is present in ES. I want to filter the list such that only one product is selected from each category and variant. From each category and variant the product with highest score should be chosen. So for above list if PRODUCT_46831, PRODUCT_47139, PRODUCT_46833 belong to category 1 , PRODUCT_46834, PRODUCT_46835 belongs to category 2 , PRODUCT_46831, PRODUCT_47139 belongs to variant 1 and PRODUCT_46831, PRODUCT_46834, PRODUCT_46835 belongs to variant 2, the grouping on category will create list below, since PRODUCT_46831, PRODUCT_46834 are top scored product from there categories:

[PRODUCT_46831, PRODUCT_46834]

Further grouping PRODUCT_46831, PRODUCT_46834 on variant_id will create result :

[PRODUCT_46831]

since PRODUCT_46831, PRODUCT_46834 belong to same variant id 1 and PRODUCT_46831 has highest score in the list.

I tried using aggregation to form buckets for each category and then applying sort function with weight = score of product and picking top product, i was able to get list of products with top score in particular category_id , i am struggling to apply variant_id grouping on top of this list. Here is my query so far:

{
  "query": {
    "function_score": {
      "functions": [
        {
                "field_value_factor": 
                {
                    "field": "item_id",
                    "factor": 0
                }
            },
        {
          "filter": { "term": { "id": "PRODUCT_229648" } },
          "weight": 0.9
        },
        {
          "filter": { "term": { "id": "PRODUCT_108882" } },
          "weight": 0.95
        },
        {
          "filter": { "term": { "id": "PRODUCT_108881" } },
          "weight": 0.8
        },
        {
          "filter": { "term": { "id": "PRODUCT_172062" } },
          "weight": 0.95
        },
        {
          "filter": { "term": { "id": "PRODUCT_172060" } },
          "weight": 0.9
        }
      ],
      "score_mode": "sum",
      "boost_mode": "sum",
      
  "query" : {
  "bool" : {
    "must" : [
      {
        "terms" : {
          "id" : [
            "PRODUCT_229648",
            "PRODUCT_108882",
            "PRODUCT_108881",
            "PRODUCT_172062",
            "PRODUCT_172060"
          ],
          "boost" : 0
        }
      }
    ],
    "adjust_pure_negative" : true,
    "boost" : 0
  }
}
}
},
 "aggs" : {
      "category_id_max_product" : {
        "terms" : { "field": "category_id" },
        "aggs": {
            "max_score": {
              "top_hits": {
                "sort": [
                  {
                    "_score": {
                      "order": "desc"
                    }
                  }
                ],
                "_source": {
                  "includes": ["_id", "category_id", "variant_id", "_score"]
                },
                "size": 1
              }
            }
          }
        }
   }
, "_source": ["_id", "category_id", "variant_id", "_score"]
, "size": 0
}

Check out the collapse feature

Thanks for the reply, collapse helped me group on category_id, i want to perform another grouping with variant_id on result of collapse, could not find any examples which does that.

Have you seen Collapse search results | Elasticsearch Guide [7.14] | Elastic ?

yeah, had seen that, as i understood the second collapsing is applied within scope of each record created by first collapse , so when I collapse by category_id i get results for each category_id and within each category_id it will apply collapse on variant_id, what i want is when i collapse on variant_id, it should do collapse across all records.

I am able to do it using scripted metric, Not the optimsed one , but works:

{
  
  "query": {
    "function_score": {
      "functions": [
        {
          "filter": { "term": { "id": "PRODUCT_229648" } },
          "weight": 0.9
        },
        {
          "filter": { "term": { "id": "PRODUCT_108882" } },
          "weight": 0.95
        },
        {
          "filter": { "term": { "id": "PRODUCT_108881" } },
          "weight": 0.8
        },
        {
          "filter": { "term": { "id": "PRODUCT_172062" } },
          "weight": 0.95
        },
        {
          "filter": { "term": { "id": "PRODUCT_172060" } },
          "weight": 0.9
        },
        {
          "filter": { "term": { "id": "PRODUCT_216303" } },
          "weight": 0.95
        },
        {
          "filter": { "term": { "id": "PRODUCT_220975" } },
          "weight": 0.96
        }
      ],
      "boost_mode": "sum",
      
  "query" : {
  "bool" : {
    "must" : [
      {
        "terms" : {
          "id" : [
            "PRODUCT_229648",
            "PRODUCT_108882",
            "PRODUCT_108881",
            "PRODUCT_172062",
            "PRODUCT_172060",
            "PRODUCT_216303",
            "PRODUCT_220975"
          ]
        }
      }
    ],
    "adjust_pure_negative" : true
  }
}
}
}, 
"aggs": {
  "intentPathsCountAgg": {
            "scripted_metric": {
                "init_script": """state.messagesList = new ArrayList();
                  state.allVariantMap = new HashMap();
                  state.variantMap = new HashMap();
                  state.categoryMap = new HashMap();
                  state.emptyVariantMap = new ArrayList();
                  """,
                "map_script": """
                double score = _score;
                String key = String.valueOf(doc['variant_group_id'].value);
                Map map = [
                              'score' : score,
                              'category_id' : String.valueOf(doc['category_brand_id'].value),
                              'product_id' : doc['item_id'].value
                           ];
                
                if (state.variantMap.containsKey(key)) {
                  if (state.variantMap.get(key).score < score) {
                     state.variantMap.put(key, map);
                  }
                }
                else {
                     state.variantMap.put(key, map);
                }
                """,
                "combine_script": """
                return state.variantMap;
                """,
                "reduce_script": """
                  Map categoryMap = new HashMap();
                  for (state in states) {
                    for (String key : state.keySet()) {
                      String categoryKey = state.get(key).category_id;
                      double score = state.get(key).score;
                      long productId = state.get(key).product_id;
                      Map map = [
                                'score' : score,
                                'product_id' : productId
                             ];
                      if (categoryMap.containsKey(categoryKey)) {
                        if (categoryMap.get(categoryKey).score < score) {
                           categoryMap.put(categoryKey, map);
                        }
                      }
                      else {
                           categoryMap.put(categoryKey, map);
                      }
                    }
                  }
                  return categoryMap;
                """
            }
        }
}
    
   
, "_source": ["_id", "category_id", "variant_group_id", "item_id", "id"]
, "size": 0
}