Hey @s1monw, there are several use cases for what I described above. One such use case is called query expansion, and one way to implement it is by computing a relevance model for the search results and extract the top terms. The top X terms, each with its own weight (=probability), are considered the expanded query.
In order to compute a relevance model, you need access to term vectors. Instead of computing all that on the client (and therefore pulling potentially huge vectors from the server), I would like to implement a plugin which does that. Specifically, if you submit a request to /myindex/type/_qexpansion
, you would get back the expanded query terms. Actually, I was hoping that (1) this could be done under the _search
endpoint, and (2) that the response will include the expanded terms, but also whatever else the user chose to return (documents, facets etc.).
There are technically 3 approaches to compute the relevance model, each varies in efficiency and quality):
Approach 1
On the coordinating node, issue a MultiTermVectorsAction after the search is done for all top K documents, and then compute the RM locally. This method fits (as far as I understand) the SearchResponseListener
object, as it will be called post-search. Please correct me if I'm wrong.
Approach 2
In order to avoid pulling all K term vectors from the different nodes to the coordinating node, I implemented (so far) my own TransportQueryExpansionAction
which (again, post search) computes a RM on each shard node, and then combines the different RMs on the coordinating node. Basically, the code looks like that:
@Override
protected void doExecute(QueryExpansionRequest request, ActionListener<QueryExpansionResponse> listener) {
try {
final SearchRequest searchRequest = request.getSearchRequest();
searchRequest.indices(request.index()).types(request.type());
searchAction.execute(searchRequest, new SearchActionListener(listener, searchRequest, request));
} catch (final Exception e) {
listener.onFailure(e);
}
}
And SearchActionListener.onResponse
looks like this:
@Override
public void onResponse(SearchResponse response) {
final List<String> docs = Arrays.stream(response.getHits().getHits())
.map(SearchHit::getId)
.collect(Collectors.toList());
final ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
final String routing = clusterState.metaData().resolveIndexRouting(null, null, request.index());
final Map<ShardId, QueryExpansionShardRequest> shardRequests = new HashMap<>();
for (final String doc : docs) {
final String concreteSingleIndex =
indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
final ShardId shardId = clusterService.operationRouting().shardId(clusterState, concreteSingleIndex,
doc, routing);
QueryExpansionShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
shardRequest = new ...;
}
shardRequest.getDocs().add(doc);
}
for (final Map.Entry<ShardId, QueryExpansionShardRequest> e : shardRequests.entrySet()) {
shardAction.execute(e.getValue(), new QueryExpansionActionListener(...));
}
}
That way, I am able to execute the search under my _qexpansion
endpoint by injecting a TransportShearcAction
and when it completes, I execute my distributed query expansion logic. Each shard computes a local RM, and in QueryExpansionActionListener.onResponse()
I merge the models and return the final QueryExpansionResponse
to the user.
Approach 3
While approach 2 performs faster than approach 1, there is still a downside - the RM that is computed on each shard is computed based on the term vectors of its local documents only (the ones in the top K search results that it holds) and some models may not produce the best RM due that.
One other approach that I thought could be interesting to try is if part of the original search request, each shard compute a local RM for its top-K documents, and then in the coordinating node I will handle the merge of those RMs into one.
This, as far as I understand, requires extending the search operation in several places:
- Add some phase, like
FetchSubPhase
, let's call it PostSubPhase
(for lack of a better name) that a SearchPlugin
would provide, and each shard will execute it. It will receive the local search hits, or basically the entire local search response, and be able to compute something, in my case a local RM.
- Allow storing some arbitrary item (
Streamable
?) on the local search response.
- Add a
PostSearchPhase
that would receive the different search responses, extract that Streamable
and compute the result. And again, be able to store some arbitrary object (now ToXContent
maybe?) on the final SearchResponse
.
@s1monw now that you understand better what I want to do, and given your comment about implementing a custom aggregation - do you think this could still be implemented as an aggregation? Even though it will operate on a final set of results, usually much much smaller than the entire search space. I've looked again for existing aggregations implementations, and while I see some that do not work on all search space, I still can't see how an aggregator receives the result SearchHits... if there is a way, I'd like to explore that.