When using asyncSearch for long running queries, results that I get from asyncSearch.get requests can't be parsed.
It looks like the aggregation types are missing in the result, which leads to errors like:
java.io.IOException: Unable to parse response body for Response{requestLine=GET /_async_search/FlNkblg2WnM4VFRlODM4UW1PYXRrN1EcRlJUcjY1SkxST0tWQzFkZTItalprUToyNDgxNA==?wait_for_completion_timeout=1s&keep_alive=10m HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 200 OK}
Caused by: org.elasticsearch.common.xcontent.XContentParseException: [1:173015] [submit_async_search_response] failed to parse field [response]
at org.elasticsearch.common.xcontent.ObjectParser.parseValue(ObjectParser.java:520)
at org.elasticsearch.common.xcontent.ObjectParser.parseSub(ObjectParser.java:530)
at org.elasticsearch.common.xcontent.ObjectParser.parse(ObjectParser.java:313)
at org.elasticsearch.common.xcontent.ConstructingObjectParser.parse(ConstructingObjectParser.java:160)
at org.elasticsearch.common.xcontent.ConstructingObjectParser.apply(ConstructingObjectParser.java:152)
at org.elasticsearch.client.asyncsearch.AsyncSearchResponse.fromXContent(AsyncSearchResponse.java:182)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1933)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$9(RestHighLevelClient.java:1607)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1671)
... 35 more
Caused by: ParsingException[Could not parse aggregation keyed as [t]]
at org.elasticsearch.search.aggregations.Aggregations.fromXContent(Aggregations.java:136)
at org.elasticsearch.action.search.SearchResponse.innerFromXContent(SearchResponse.java:322)
at org.elasticsearch.client.asyncsearch.AsyncSearchResponse.parseSearchResponse(AsyncSearchResponse.java:178)
at org.elasticsearch.client.asyncsearch.AsyncSearchResponse.lambda$static$1(AsyncSearchResponse.java:169)
at org.elasticsearch.common.xcontent.AbstractObjectParser.lambda$declareObject$1(AbstractObjectParser.java:159)
at org.elasticsearch.common.xcontent.ObjectParser.lambda$declareField$9(ObjectParser.java:375)
at org.elasticsearch.common.xcontent.ObjectParser.parseValue(ObjectParser.java:518)
... 43 more
Is there anything I can do, otherwise a lot of people should face that issue, shouldn't they?
Sorry for the delay. It took a while until I found a set of data where the query execution takes longer than 1 second.
This is important to reproduce the issue because if the query can be executed within 1 second, the initial async search submit request already contains the result. In that case the result can be parsed without any issues.
Only if we need subsequent async search get requests, to wait until the query execution is done, we are facing the issue. The response for the async search get request can not be parsed.
I can reproduce the issue with the following small java snippet:
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.asyncsearch.AsyncSearchResponse;
import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import static org.elasticsearch.client.RequestOptions.DEFAULT;
public class AsyncSearch {
public static final String INDEX_NAME = "async";
public static final String NESTED_PATH_NAME = "nested";
private final RestHighLevelClient client;
public AsyncSearch() {
this.client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
)
);
}
public static void main(String[] args) throws IOException {
AsyncSearch as = new AsyncSearch();
try {
as.prepareData();
System.out.println("***** The following query execution works, since the initial submit request waits up " +
"to 5 seconds. The query can be executed within that time and the response for the asyncSearch " +
"submit request can be parsed without any problems *****");
as.executeQuery(5);
as.cleanUp();
as.prepareData();
System.out.println("***** The following query execution does not work, since the initial submit request waits up " +
"to 1 second. The query can not be executed within that time and the final response for the subsequent asyncSearch " +
"get requests can not be parsed without problems *****");
as.executeQuery(1);
as.cleanUp();
} finally {
as.client.close();
}
}
private void prepareData() throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(INDEX_NAME);
try {
client.indices().delete(deleteIndexRequest, DEFAULT);
} catch (ElasticsearchStatusException e) {
if (e.status() != RestStatus.NOT_FOUND) {
throw e;
}
}
CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX_NAME);
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
mappingBuilder.startObject();
{
mappingBuilder.startObject("properties");
{
mappingBuilder.startObject(NESTED_PATH_NAME);
{
mappingBuilder.field("type", "nested");
}
mappingBuilder.endObject();
}
mappingBuilder.endObject();
}
mappingBuilder.endObject();
createIndexRequest.mapping(mappingBuilder);
client.indices().create(createIndexRequest, DEFAULT);
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < 100; i++) {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("A", "A_" + i);
builder.startArray(NESTED_PATH_NAME);
}
for (int j = 0; j < 10000; j++) {
builder.startObject();
{
builder.field("N_A", "N_A_" + i + j);
builder.field("N_B", j);
}
builder.endObject();
}
builder.endArray();
builder.endObject();
bulkRequest.add(new IndexRequest(INDEX_NAME).source(builder));
}
client.bulk(bulkRequest, DEFAULT);
RefreshRequest refreshRequest = new RefreshRequest(INDEX_NAME);
client.indices().refresh(refreshRequest, DEFAULT);
}
private void executeQuery(int waitTimeSeconds) throws IOException {
SearchSourceBuilder searchSource = new SearchSourceBuilder().size(0)
.aggregation(new TermsAggregationBuilder("t").field("A.keyword").size(100)
.subAggregation(new NestedAggregationBuilder("n", NESTED_PATH_NAME)
.subAggregation(new TermsAggregationBuilder("nt").field(NESTED_PATH_NAME + ".N_A.keyword").size(100)
.subAggregation(new PercentilesAggregationBuilder("p").field(NESTED_PATH_NAME + ".N_B"))
)
)
);
String[] indices = new String[]{INDEX_NAME};
SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(searchSource, indices);
submitRequest.setWaitForCompletionTimeout(TimeValue.timeValueSeconds(waitTimeSeconds));
AsyncSearchResponse response = client.asyncSearch().submit(submitRequest, DEFAULT);
System.out.println("AsyncSearch submit: isRunning=" + response.isRunning() + " isPartial=" + response.isPartial());
GetAsyncSearchRequest getRequest = new GetAsyncSearchRequest(response.getId());
getRequest.setWaitForCompletion(TimeValue.timeValueSeconds(waitTimeSeconds));
while (response.isRunning()) {
response = client.asyncSearch().get(getRequest, DEFAULT);
System.out.println("AsyncSearch get: isRunning=" + response.isRunning() + " isPartial=" + response.isPartial());
}
}
private void cleanUp() throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(INDEX_NAME);
client.indices().delete(deleteIndexRequest, DEFAULT);
}
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.