Hi,
I'm trying to paginate over composite aggregation in Java. I have some working code, but it doesn't seem to be very efficient, as it re-builds the query each time. Is there a way to re-use the query and just change the "afterKey" parameter?
Here is the code that I tried:
public static void main(String[] args) throws IOException {
Instant before = Instant.now();
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
int searchAfterKey = -1;
System.out.println("After key start: " + searchAfterKey);
Map<Object, Object> responseMap = null;
boolean emptyBuckets = false;
while (!emptyBuckets) {
responseMap = getProfiles(client, searchAfterKey);
emptyBuckets = (Boolean) responseMap.get("emptyBuckets");
if (!emptyBuckets) {
searchAfterKey = (Integer) responseMap.get("afterKeyVal");
}
}
System.out.println("After key end: " + searchAfterKey);
client.close();
Instant after = Instant.now();
long delta = Duration.between(before, after).toMillis();
System.out.println("Time taken: " + delta + " ms.");
}
private static Map<Object, Object> getProfiles(RestHighLevelClient client, int searchAfterKey) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(0);
QueryBuilder rangeQuery = QueryBuilders.rangeQuery("effectiveDate").from(0).to(1575518400, false);
QueryBuilder boolQuery = QueryBuilders.boolQuery().must(rangeQuery);
ConstantScoreQueryBuilder constScoreQB = new ConstantScoreQueryBuilder(boolQuery);
List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<CompositeValuesSourceBuilder<?>>();
TermsValuesSourceBuilder id = new TermsValuesSourceBuilder("agg_on_id").field("id");
sources.add(id);
CompositeAggregationBuilder compositeAggregationBuilder = new CompositeAggregationBuilder("by_id", sources);
Map<String, Object> afterKey = new HashMap<String, Object>();
afterKey.put("agg_on_id", searchAfterKey);
compositeAggregationBuilder.size(10000).aggregateAfter(afterKey).subAggregation(AggregationBuilders
.topHits("latest_snapshot").sort("effectiveDate", SortOrder.DESC).size(100).fetchSource(true));
sourceBuilder.query(constScoreQB).aggregation(compositeAggregationBuilder);
SearchRequest searchRequest = new SearchRequest("from_dynamo");
searchRequest.source(sourceBuilder);
System.out.println(searchRequest.source().toString());
// response
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
ParsedComposite aggs = searchResponse.getAggregations().get("by_id");
for (ParsedComposite.ParsedBucket bucket : aggs.getBuckets()) {
for (Aggregation pth : bucket.getAggregations().asList()) {
SearchHit[] searchHit = ((ParsedTopHits) pth).getHits().getHits();
for (SearchHit sh : searchHit) {
System.out.println(sh.getSourceAsString());
}
}
}
boolean emptyBuckets = aggs.getBuckets().isEmpty();
Map<Object, Object> responseMap = new HashMap<Object, Object>();
if (!emptyBuckets) {
Integer afterKeyVal = (Integer) aggs.afterKey().get("agg_on_id");
responseMap.put("afterKeyVal", afterKeyVal);
}
responseMap.put("emptyBuckets", emptyBuckets);
return responseMap;
}
Thanks,
Florin