Java high level rest client: how to deal with aggregations?

Hi,

I am trying out the high level java client for elasticsearch.

I managed on a simple test to query some records, but I do not get the aggregations running.

As I understand I need to put a query and the aggregation to the same SearchSourceBuilder which is put into the SearchRequest object. Is that correct so far?

My test application is throwing the following exception when firing

searchResponse = client.search(searchRequest);

Exception in thread "main" ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:177)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:573)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:549)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:456)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:429)
	at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:368)
	at com.tsystems.elasticsearchclientlts.Test.testAggregation(Test.java:89)
	at com.tsystems.elasticsearchclientlts.Test.main(Test.java:122)
	Suppressed: org.elasticsearch.client.ResponseException: method [GET], host [http://localhost:9999], URI [/_search?typed_keys=true&ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 500 Internal Server Error]
{"error":{"root_cause":[{"type":"illegal_state_exception","reason":"value source config is invalid; must have either a field context or a script or marked as unwrapped"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":".kibana-6","node":"qaspBNsmRdaXVI8Lq5qj8w","reason":{"type":"illegal_state_exception","reason":"value source config is invalid; must have either a field context or a script or marked as unwrapped"}}]},"status":500}
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:357)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:346)
		at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
		at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
		at org.apache.http.impl.nio.client.InternalRequestExecutor.inputReady(InternalRequestExecutor.java:83)
		at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
		at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
		at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
		at java.lang.Thread.run(Thread.java:748)

Here is the code snippet of my application:

private static void testAggregation(RestHighLevelClient client, String myQueryString) throws IOException
{
// test of aggregation

// building the search (query  + aggregations)
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// setting the query which is used for the aggregation
searchSourceBuilder.query(QueryBuilders.queryStringQuery(myQueryString));
         
// build the aggregation
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_msisdn")
        .field("msisdn.keyword");
aggregation.subAggregation(AggregationBuilders.count("count"));
searchSourceBuilder.aggregation(aggregation);

// create the request
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(searchSourceBuilder);

// fire the request and get the response
SearchResponse searchResponse = new SearchResponse();
searchResponse = client.search(searchRequest);


// get aggregations:
Aggregations aggregations = searchResponse.getAggregations();
Terms byMsisdn = aggregations.get("by_msisdn");

}



/**
 * @param args
 */
public static void main(String[] args)
{
    String server = args[0];
    int port = Integer.parseInt(args[1]);
    
    
    RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(
                    new HttpHost(server, port, "http")));
    
    // define the complete query string, like you do it in kibana or timelion
    String myQueryString = new String("logType: ttp OR type: ttp");
    
    try
    {
        // call query test procedure
        testQuerySearch(client, myQueryString);
        
        // call the aggregation test procedure
        //testAggregation(client, myQueryString);
        
        client.close();
    } 
    catch (IOException e)
    {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

Thanks, Andreas

how can I set the index pattern in which to search?

I noticed that the aggregation failed when running on a system index.
I changed following:

SearchRequest searchRequest = new SearchRequest()
to
SearchRequest searchRequest = new SearchRequest(index);

Now I added the correct index pattern (see above), but I get another exception now for the aggregation. Query is still running fine:

Exception in thread "main" ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:177)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:573)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:549)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:456)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:429)
	at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:368)
	at com.tsystems.elasticsearchclientlts.Test.testAggregation(Test.java:89)
	at com.tsystems.elasticsearchclientlts.Test.main(Test.java:123)
	Suppressed: org.elasticsearch.client.ResponseException: method [GET], host [http://localhost:9999], URI [/tux-*/_search?typed_keys=true&ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 500 Internal Server Error]
{"error":{"root_cause":[{"type":"illegal_state_exception","reason":"value source config is invalid; must have either a field context or a script or marked as unwrapped"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":"tux-prod-2017.08.12","node":"qaspBNsmRdaXVI8Lq5qj8w","reason":{"type":"illegal_state_exception","reason":"value source config is invalid; must have either a field context or a script or marked as unwrapped"}}]},"status":500}
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:357)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:346)
		at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
		at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
		at org.apache.http.impl.nio.client.InternalRequestExecutor.inputReady(InternalRequestExecutor.java:83)
		at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
		at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
		at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
		at java.lang.Thread.run(Thread.java:748)

Here is how I'm doing it:

Make sure you are using the same major version of elasticsearch.

Thanks. I came a bit further.
Problem was the count subAggregation bellow my term aggregation. I removed it and it works.

1 Like

I've got a further question.

Now I need to filter the results for a special time range.

I made it that far. It works, but it is too static in my eyes:

    List<QueryBuilder> mustQueries = new ArrayList<QueryBuilder>();
    mustQueries.add(QueryBuilders.queryStringQuery(myQueryString));
    mustQueries.add(QueryBuilders.rangeQuery("@timestamp").gte("now-30d/d").lt("now/d"));
    mustQueries.get(0).toString();
    mustQueries.get(1).toString();

    
    QueryBuilder query = QueryBuilders.boolQuery().must(mustQueries.get(0)).must(mustQueries.get(1));

I only found the way to chain the different must queries one after another.
How can I directly access the boolQuery of the query variable? Or how can I add another must Query later?

I don't know yet, if I have a static count of queries which needs to be combined with AND, so I would like to create a list of them and add them dynamically.

Thanks, Andreas

I did not test it but is this what you want to do?

List<QueryBuilder> queries = new ArrayList<QueryBuilder>();
queries.add(QueryBuilders.queryStringQuery(myQueryString));
queries.add(QueryBuilders.rangeQuery("@timestamp").gte("now-30d/d").lt("now/d"));

BoolQueryBuilder bool = QueryBuilders.boolQuery();
for (QueryBuilder query : queries) {
   bool.must(query);
}

thanks, that worked for me.

Now it is quite clear to me how to build the queries and fire it against elasticsearch.

But I have problems with accessing the buckets and aggregations of the response.

I got following response:

{
  "took": 6,
  "timed_out": false,
  "_shards": {
    "total": 17,
    "successful": 17,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 947911,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "grouped_by_hour": {
      "buckets": [
        {
          "key_as_string": "2017-12-03T00:00:00.000Z",
          "key": 1512259200000,
          "doc_count": 481567,
          "by_serviceCall": {
            "doc_count_error_upper_bound": 3744,
            "sum_other_doc_count": 287277,
            "buckets": [
              {
                "key": "DB_TMMESSAGES",
                "doc_count": 114124,
                "avg_proc_time": {
                  "value": 0.05604605518401845
                }
              },
              {
                "key": "DB_LWVD_LIST",
                "doc_count": 80003,
                "avg_proc_time": {
                  "value": 1.552289284874029
                }
              }
            ]
          }
        },
        {
          "key_as_string": "2017-12-03T12:00:00.000Z",
          "key": 1512302400000,
          "doc_count": 466344,
          "by_serviceCall": {
            "doc_count_error_upper_bound": 7049,
            "sum_other_doc_count": 269121,
            "buckets": [
              {
                "key": "DB_TMMESSAGES",
                "doc_count": 115923,
                "avg_proc_time": {
                  "value": 0.060222733712912695
                }
              },
              {
                "key": "DB_LWVD_LIST",
                "doc_count": 81152,
                "avg_proc_time": {
                  "value": 1.219554660065384
                }
              }
            ]
          }
        }
      ]
    }
  }
}

here is my source snippet:

    // fire the request and get the response
    SearchResponse searchResponse = new SearchResponse();
    searchResponse = client.search(searchRequest);
    System.out.println("query source" + searchRequest.source());
    
    // get aggregations:
    Aggregations aggResult = searchResponse.getAggregations();
    System.out.println("\n\n" + searchResponse.toString());
    
    for (Aggregation level0Agg : aggResult) 
    {
        if (level0Agg.getType().equals(DateHistogramAggregationBuilder.NAME))
        {
            ParsedDateHistogram myAgg = aggResult.get(level0Agg.getName());
            for (org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket bucketLevel0 : myAgg.getBuckets())
            {
                System.out.println("key of time bucket: " + bucketLevel0.getKey());
                System.out.println("   docCount: " + bucketLevel0.getDocCount());

                
                for (Aggregation level1Agg : bucketLevel0.getAggregations())
                {
                    System.out.println("   bucket Type level 2: " + level1Agg.getType());
                    System.out.println("   bucket Name level 2: " + level1Agg.getName());
                                            
                }
            }
        }
    }

I am getting this far:
output when running:

key of time bucket: 2017-12-03T00:00:00.000Z
   docCount: 481567
   bucket Type level 2: sterms
   bucket Name level 2: by_serviceCall
key of time bucket: 2017-12-03T12:00:00.000Z
   docCount: 466344
   bucket Type level 2: sterms
   bucket Name level 2: by_serviceCall

So, I can get into the DateHistogram bucket and can output the key (timestamp) and the doc count of this bucket.
When I get the aggregations inside of the timestamp bucket I am able to get the name and the type, but how can I get the bucket to access? I miss a method "getBuckets()" there which gives me the buckets as an iteratable object.

I am not a full time developer, so please forgive me, if the solution is really obvious :wink:

Thanks, Andreas

1 Like

Maybe some words about my target I want to reach:

We do not have the resources to save all our data much longer than 30 days.
We do not want to get rid of old data completely, but there a bucket aggregation based on 3 or 6 hours are enough to compare current and old times.

So I would like to do some aggregations in a 6 hour bucket and store these aggregations into a long time storage index.

So we only have some hundreds events per 6 hour instead of millions. Its a good alternative instead of saving dashboards which are showing a month as pdf as we currently do.

So in the current Example I would like to store to the new event:

@timestamp: from key_as_string of level 0 bucket (top level)
by_serviceCall: key of bucket of level 1
doc_count: doc_count_value of bucket with highest level
proctime.avg: value of avg_proc_time

Later there may be some nested term aggregations more.
Aggregations will be taken from deepest level. The names of buckets will be stored as a field.

Or is there a simpler method than using the functionality of the highlevel client.

I want to have it quite generic, that it can easily be done to configure new queries without changing the code - if we decide we need some more data to be aggregated and saved to long time storage.

can anybody provide info how i can access the buckets and aggregations??

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.