What is the proper design pattern for plugins that must submit multiple requests to Elasticsearch?

Question for plugin authors

What is the proper way to implement a REST handler in a custom ActionPlugin (a.k.a. API extension plugin) that must wait for the response of multiple Elasticsearch requests before returning a response to the user?

Example

Consider the example handler below, which sends two requests to Elasticsearch on behalf of a single request from the user. The handler uses its given NodeClient to:

  1. Create an index
  2. Perform a search
  3. Respond to the user after both actions have completed
@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
    return channel -> {
        try {
            
            // Create an arbitrary index
            client.admin().indices().prepareCreate("sample-index")
                .setSettings(Settings.builder()
                        .put("index.number_of_shards", 1)
                        .put("index.number_of_replicas", 0)
                )
                .addMapping("doc", "{\"properties\":{\"foo\":{\"type\":\"keyword\"}}}", XContentType.JSON)
                .get();
            
            // Submit an arbitrary search request
            client.prepareSearch("*").get();
            
            // Return an arbitrary response
            XContentBuilder content = XContentFactory.jsonBuilder();
            content.startObject().field("acknowledged", true).endObject();
            channel.sendResponse(new BytesRestResponse(RestStatus.OK, content));
            
        } catch (final Exception e) {
            channel.sendResponse(new BytesRestResponse(channel, e));
        }
    };
}

Problem

The problem with the example above is that the two .get() actions submitted by the NodeClient are blocking calls (see explanation). This will cause an Elasticsearch cluster to hang.

What is the proper way to implement the desired behavior of the example above?

You generally write your own ActionListener<...> and pass it to the .execute(listener) method rather than calling .get(). Your listener's onResponse() method can then run the next step of the process.

There's a bunch of utilities to make this a bit simpler and/or to encapsulate common patterns, e.g. ActionListener#delegateFailure, ActionListener#map, RestResponseListener<>, StepListener<>, ActionRunnable<> etc. It always ends up with things in a slightly funny order, but there's not really a way around that. Native syntax for async code would be nice, but that's not something Java has today.

Would you advise against using something like the CompletableFuture API to wrap Actions? That seems to be the most "native" way to order async work that Java has today.

Yes, IIRC the problem with CompletableFuture is that it swallows Throwable so it means you risk missing something vitally important like an OutOfMemoryException or an AssertionError. There's things like PlainActionFuture and PlainListenableActionFuture in Elasticsearch that do the same sort of thing but which only catch Exception which is much safer.

Can you expand on what you mean by "swallow"? Shouldn't those errors still be reported in CompletableFuture#exceptionally, as long as the actions are wrapped correctly? I think the main benefit here is that it makes controlling the async flow much, much simpler with easy control over threads, composing async work, etc.

Ehh I'm probably not the best person to go into the details here so I might be working off of incorrect or dated information. I'm haven't used CompletableFuture very much at all.

There are places where we permit completing a listener twice, and we don't observe the result of every listener either, both of which I think might swallow a vital Error if listeners caught them. ActionListener and CompletableFuture are almost interchangeable, but this one subtle point means that they're not.

AFAIK these are design decisions that Elasticsearch made before the whole Future framework landed in the JDK and it's a little unfortunate that they don't align.

Ah, that makes a lot of sense, thanks David! Yeah, a bit unfortunate about the diverged async frameworks, but good to know about. Last question, promise! Do you know if there are any general situations/ guidelines on where listeners are completed twice, or is it on a case-by-case basis?

No, I don't know of a general pattern. I tried to make a change once that enforced once-and-only-once completion (by throwing an AssertionError on a double-call) just to see how bad it was. It broke all the things. There's stuff like GroupedActionListener<> that deliberately gets called N times, and things like timeouts are implemented by racing to complete a listener too.


(edit) Also it's pretty common that if onResponse throws an exception then it's passed to onException of the same listener. There were loads of other things too, it would be a major piece of work to migrate.

1 Like

Just adding some more notes from digging around, it looks like there is a bit of CompletableFuture usage in ES, mostly in the transport layer, wrapped by a CompletableContext, which interops with ActionListeners mostly via CloseableConnection and ActionListener#toBiConsumer. Not to go against your suggestion, just taking a look at how that API is making its way into ES.

That's an interesting question. CompletableContext was added to isolate the usage of CompletableFuture in the transport layer to avoid catching Throwable:

I'm not sure why the implementation is still based on CompletableFuture, I think there are other viable options too, I'll have to ask around.

Oh, that's a very interesting history. Do you know the reason behind the pretty strict differentiation between Exception and Error in Elasticsearch? What's the issue with using Throwable?

Just following the docs:

An Error is a subclass of Throwable that indicates serious problems that a reasonable application should not try to catch.

Elasticsearch is a reasonable application and it therefore tries hard not to catch any Error. If an Error is thrown then there's no sensible way to recover or handle it, all you can do is exit.

1 Like

Future readers:

Check out this blog post which proposes some elegant solutions with code examples for asynchronous usage of the Elasticsearch 7.x Java APIs:

See also: This PR for a CompletableFuture implementation that is slated for Elasticsearch 8.x.

I can't recommend the examples in that blog post for the same reasons we were discussing above: they catch (and sometimes silently swallow) an Error which no reasonable application should do.

The PR you link may get merged eventually but we're definitely not committing to merging it into any particular version. The 8.0.0 label is only because a version is required on PRs and this is the largest number available today. The whole point of that PR is to prevent folks from using a bare CompletableFuture since it might accidentally swallow an Error.

FWIW we just merged a PR that removes some other usages of CompletableFuture that we had inadvertently introduced:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.