Adding a new search feature in a plugin

Hi. I am trying to add a search feature in a plugin. The feature is not e.g. a FetchSubPhase, but rather something similar to suggest/aggregation -- that is, in addition to performing the regular search request, I would also like to perform additional calculations on the results, but return the calculation result as a global entry of the search result. Let me give an example: assume that the user submits a request like:

GET /index/type/_search
{
  "query": ...,
  "size": ...,
  "my_feature": { feature-spec-goes-here }
}

He'll get response like:

{
  "hits": [the regular search hits],
  "my_results" : { [some global result corresponding to my_feature request]}
}

I've looked at SearchPlugin.getSearchExts() and while I believe it can help me parse the my_feature part of the request, I am not sure how it can help me perform additional computations after the search is over (again, think as if I want to add a new feature like aggregations/suggest). I've also looked at FetchTermVectorsPlugin which uses this extension, but unfortunately it implements a FetchSubPhase, which is not what I need (I think!).

Am I looking in the wrong place? If I only add a MyFeatureAction, is there a way I can plug it into the _search endpoint?

we don't have support for this yet, There is also quite some work to be done to also return the right things from a shard and then merge them together on the coordinating node. I am convinced it's not a low hanging fruit and neither I am sure we wanna do that. Can you maybe implement it as a custom aggregation?

Maybe you could execute your action from a search response listener then modify and return the SearchResponse? You can get a hold of the Client you need to execute the action with in the createComponents method.

I actually wanted to implement a SearchResponseListener, but SearchPlugin.getSearchResponseListeners() does not seem to be called at all. I've checked both in 5.5.1 code as well current master. @s1monw can you confirm this method is not indeed used? I may have missed where it's called.

If I could use a response listener, that would be 90% great. Unfortunately, SearchResponse does not let you store arbitrary objects on it (something generic like ToXContent would do). But at least if the response listener was being called, I guess I'd find where to store the result.

@s1monw I also looked at implementing a custom aggregation, however it doesn't feel right to me. The aggregator eventually needs to return a LeafBucketCollector however my computation really works on the top-K results only, and not on every potential search hit. What I really need is a post-search like phase I could implement.

There is also quite some work to be done to also return the right things from a shard and then merge them together on the coordinating node

I think that if there was a search-post-phase I could override, ES will not need to handle the merging of results from shards, that would be my responsibility.

What I've implemented so far is a new endpoint, and I delegate the search part to TransportSearchAction which I require being injected to my transport action. I then store the SearchResponse on my response object and it seems to work fine so far.

Still, if there was a search-post-phase, or if SearchResponseListener was being called, I imagine I'd need to write less code overall to accomplish the same task. So @s1monw, if you think making SearchResponseListener usable (ie if it's a bug that it's not being called, or a forgotten feature) is something you would consider, I don't mind looking into contributing such change to Elastic.

Regarding SearchPlugin.getSearchResponseListeners(), the uses were actually removed in a followup PR after it was added, but the method itself was never removed (ie it is dead code, as you found).

I see. Is there an interest to re-add it? Plus allow some arbitrary
ToXContent to be stored on the response?

@Shai_Erera I may be wrong (@s1monw can correct me), but I believe the problem was the thread the listener callback ran on.

@Shai_Erera I wonder how you would reduce the shard responses into a search response in your case. could you explain a bit more in detail what you need to do / want to do. You asked for a query phase but this happens on the shard level while search response listeners act on the coordinating node. Can you shed some light on this?

Hey @s1monw, there are several use cases for what I described above. One such use case is called query expansion, and one way to implement it is by computing a relevance model for the search results and extract the top terms. The top X terms, each with its own weight (=probability), are considered the expanded query.

In order to compute a relevance model, you need access to term vectors. Instead of computing all that on the client (and therefore pulling potentially huge vectors from the server), I would like to implement a plugin which does that. Specifically, if you submit a request to /myindex/type/_qexpansion, you would get back the expanded query terms. Actually, I was hoping that (1) this could be done under the _search endpoint, and (2) that the response will include the expanded terms, but also whatever else the user chose to return (documents, facets etc.).

There are technically 3 approaches to compute the relevance model, each varies in efficiency and quality):

Approach 1
On the coordinating node, issue a MultiTermVectorsAction after the search is done for all top K documents, and then compute the RM locally. This method fits (as far as I understand) the SearchResponseListener object, as it will be called post-search. Please correct me if I'm wrong.

Approach 2
In order to avoid pulling all K term vectors from the different nodes to the coordinating node, I implemented (so far) my own TransportQueryExpansionAction which (again, post search) computes a RM on each shard node, and then combines the different RMs on the coordinating node. Basically, the code looks like that:

@Override
protected void doExecute(QueryExpansionRequest request, ActionListener<QueryExpansionResponse> listener) {
  try {
    final SearchRequest searchRequest = request.getSearchRequest();
    searchRequest.indices(request.index()).types(request.type());
    searchAction.execute(searchRequest, new SearchActionListener(listener, searchRequest, request));
  } catch (final Exception e) {
    listener.onFailure(e);
  }
}

And SearchActionListener.onResponse looks like this:

@Override
public void onResponse(SearchResponse response) {
  final List<String> docs = Arrays.stream(response.getHits().getHits())
    .map(SearchHit::getId)
    .collect(Collectors.toList());
  
  final ClusterState clusterState = clusterService.state();
  clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
  
  final String routing = clusterState.metaData().resolveIndexRouting(null, null, request.index());
  final Map<ShardId, QueryExpansionShardRequest> shardRequests = new HashMap<>();
  for (final String doc : docs) {
    final String concreteSingleIndex =
            indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
    final ShardId shardId = clusterService.operationRouting().shardId(clusterState, concreteSingleIndex,
            doc, routing);
    QueryExpansionShardRequest shardRequest = shardRequests.get(shardId);
    if (shardRequest == null) {
      shardRequest = new ...;
    }
    shardRequest.getDocs().add(doc);
  }

  for (final Map.Entry<ShardId, QueryExpansionShardRequest> e : shardRequests.entrySet()) {
      shardAction.execute(e.getValue(), new QueryExpansionActionListener(...));
  }
}

That way, I am able to execute the search under my _qexpansion endpoint by injecting a TransportShearcAction and when it completes, I execute my distributed query expansion logic. Each shard computes a local RM, and in QueryExpansionActionListener.onResponse() I merge the models and return the final QueryExpansionResponse to the user.

Approach 3
While approach 2 performs faster than approach 1, there is still a downside - the RM that is computed on each shard is computed based on the term vectors of its local documents only (the ones in the top K search results that it holds) and some models may not produce the best RM due that.

One other approach that I thought could be interesting to try is if part of the original search request, each shard compute a local RM for its top-K documents, and then in the coordinating node I will handle the merge of those RMs into one.

This, as far as I understand, requires extending the search operation in several places:

  1. Add some phase, like FetchSubPhase, let's call it PostSubPhase (for lack of a better name) that a SearchPlugin would provide, and each shard will execute it. It will receive the local search hits, or basically the entire local search response, and be able to compute something, in my case a local RM.
  2. Allow storing some arbitrary item (Streamable?) on the local search response.
  3. Add a PostSearchPhase that would receive the different search responses, extract that Streamable and compute the result. And again, be able to store some arbitrary object (now ToXContent maybe?) on the final SearchResponse.

@s1monw now that you understand better what I want to do, and given your comment about implementing a custom aggregation - do you think this could still be implemented as an aggregation? Even though it will operate on a final set of results, usually much much smaller than the entire search space. I've looked again for existing aggregations implementations, and while I see some that do not work on all search space, I still can't see how an aggregator receives the result SearchHits... if there is a way, I'd like to explore that.

thanks for the long and detailed response @Shai_Erera - lemme reply on approach 3.

what you want is basically something that executes on the top N of the query phase and reduce the shard response on the coordinator and the response would live right next to aggs / highlighting etc. But this is exactly my point that I think will be very complicated. We have such a construct already called aggs. They have all the relevant code. but they don't execute on the top N. Yet, we might want to think about a built in phase that can run aggs on the top N only I wonder if that would help here. @jpountz @colings86 what do you think?

I also think that FetchSubPhase is too late you want these results also if you have size=0 and we skip the fetch phase in that case.

Can you please clarify if by complicated you mean complicated for my custom code to handle, or for ES to support natively? I already implemented approach 2 and in my action I merge the different shard responses..

Right, I gave it as an example, but I agree it should probably be something like a PostSubPhase, that receives all search results.

Exactly! In my action today, if you specify size = 0, you will get 0 expanded terms. This is another downside to approach 2 - today the coordinating node must receive the entire search response, and e.g. if you wanted to compute an RM from the top-50 documents, but show the user only 10 documents, you have to adjust the search request accordingly.

Hmm ... now that I write that, I wonder if I could still implement an Aggregation which just keeps track of the collected documents and computes the top N ones, where N relates to the number of documents from which you want to build the RM, and could be different from K which is the number of search results you want to return to the user, and then build the RM.

I think I could write code very similar to TopHitsAggregator, and once I have the top docs per bucket (I would have just one bucket, right?), I will extract the term vectors of each and build the RM ... what do you think?

Even if that works, I would still appreciate if we can discuss extending the current search pipeline to allow some custom operations and response objects, if possible.

well we have to add:

  • code / extension points that execute on the shard
  • allow for serialization to and from the coordinator (the request response part)
  • code / extension points to do the merging that allows also to merge incrementally
  • add custom request / response part to search request that can be extended
  • require xcontent rendering

it's non-trivial IMO and I do think we have that already, it's called aggregations. Yet, in your special case you want them to run on the top N.

Check out the sampler aggregation - you would use it as a parent agg to your custom agg in order to filter to the top-matching docs.
Similar documents rank similarly so you may also want to take a look at the diversified_sampler aggregation.

@Shai_Erera I agree with @s1monw that this sounds like something that could be solved in aggregations.

I think you probably could do this either as you suggested with a custom aggregator which keeps only the TopN documents like the TopHitsAggregator does, or as a custom aggregator that collects documents from a parent sampler aggregator @Mark_Harwood suggested. Both of these options would give you:

  • Only the Top N hits per shard rather than all the documents matching the query
  • An opportunity to compute a per shard response which can be serialised to the coordinating node (as an implementation of InternalAggregation)
  • An opportunity to combine the results from each shard on the coordinating node (by implementing InternalAggregation.reduce()
  • A portion of the Search response you can insert arbitrary information for you calculation which is name-spaced away from other parts of the response (albeit inside the aggregations part of the search response)

In practice statistical analysis of words in top-matching results is often thrown off by near-dups which rank similarly.
The significant_text aggregation has built-in functions to cleanse streams of text of near duplicates prior to performing statistical analysis. See https://youtu.be/zH7bizwjj20?t=4m

Thanks guys for your tips, I will certainly retry the aggregation route (seems more promising now)! @Mark_Harwood I've looked at significant_text aggregation and I'll see if I can evaluate it against the current query expansion techniques we have.

Yes, that's right @colings86. Still, I hope that ES would allow extending the search process, such that I will not have to use only existing structures to store my payload in. I realize it's not necessarily a trivial task, and I'm willing to take a stab at it, if you guys are open to explore that.

In the meanwhile, I'll explore existing hooks (e.g. aggregations) and I also have my approach 2 in case an aggregation (or highlight/suggest...) will not fit the task.

I recommend indexing with shingle size 2 as per https://youtu.be/HH7EnULR6KM

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.