ThreadContext API

Hi, this is a follow up question from https://github.com/elastic/elasticsearch/issues/40391.

I'm struggling to understand how to properly use the org.elasticsearch.common.util.concurrent.ThreadContext API.

The javadoc says:

/**
 * A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
 * a thread. It allows to store and retrieve header information across method calls, network calls as well as threads spawned from a
 * thread that has a {@link ThreadContext} associated with. Threads spawned from a {@link org.elasticsearch.threadpool.ThreadPool}
 * have out of the box support for {@link ThreadContext} and all threads spawned will inherit the {@link ThreadContext} from the thread
 * that it is forking from.". Network calls will also preserve the senders headers automatically.
 * <p>
 * Consumers of ThreadContext usually don't need to interact with adding or stashing contexts. Every elasticsearch thread is managed by
 * a thread pool or executor being responsible for stashing and restoring the threads context. For instance if a network request is
 * received, all headers are deserialized from the network and directly added as the headers of the threads {@link ThreadContext}
 * (see {@link #readHeaders(StreamInput)}. In order to not modify the context that is currently active on this thread the network code
 * uses a try/with pattern to stash it's current context, read headers into a fresh one and once the request is handled or a handler thread
 * is forked (which in turn inherits the context) it restores the previous context. For instance:
 * </p>
 * <pre>
 *     // current context is stashed and replaced with a default context
 *     try (StoredContext context = threadContext.stashContext()) {
 *         threadContext.readHeaders(in); // read headers into current context
 *         if (fork) {
 *             threadPool.execute(() -&gt; request.handle()); // inherits context
 *         } else {
 *             request.handle();
 *         }
 *     }
 *     // previous context is restored on StoredContext#close()
 * </pre>
 *
 */

In the example given, stashing the current context and auto-restoring it in the automatic close() called at the end of the try might be useful to avoid the IllegalArgumentException: value for key [test-transient] already present described in https://github.com/elastic/elasticsearch/issues/40391

However, it won't cover the main use case of when I want to propagate a value across different threads while they are handling the same request. I.e.

  • Setting a header in my ActionFilter
  • Recovering the header value from my IndexSearchWrapper

What's the best/simplest way to achieve this?

Could you explain a bit more what you've tried and in what way it's not working? Looking at, for instance, SecurityActionFilter, it looks like values do propagate across different threads (and, indeed, across the network) when handling the same request.

Hi David, thanks for replying. As I said, I need to write a header (or a transient) in my implementation of ActionFilter, and read it in my implementation IndexSearcherWrapper.

I tried wrapping in two try statements similar as the one seen in the documentation both the code that writes the header (in the ActionFilter) and reads it (in the IndexSearchWrapper) without luck.

Here is how:

In my ActionFilter:

 @Override
  public <Request extends ActionRequest, Response extends ActionResponse> void apply(
      Task task,
      String action,
      Request request,
      ActionListener<Response> listener,
      ActionFilterChain<Request, Response> chain) {

    try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
          threadPool.getThreadContext().putTransient("key","Value");
    }
    ...
    chain.proceed(task, action, request, listener);
  
}

In my IndexSearchWrapper:

  @Override
  protected DirectoryReader wrap(DirectoryReader reader) throws IOException {
    try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
      System.out.println("TRANSIENT PARAM: >>>>>>>>>>>> " + threadPool.getThreadContext().getTransient("key"));
    }
  }

The result of this is

TRANSIENT PARAM: >>>>>>>>>>>> null

There must be something obvious I'm missing :frowning:

This seems wrong. You're creating a context, putting a value into it, then discarding the context before proceeding. I think you should be calling chain.proceed within the try block.

This also seems wrong. You are calling stashContext() which temporarily hides the current context and starts again with a brand new, empty, one. I think you should not be calling .stashContext() here.

Also are you sure you want to use .putTransient and not .putHeader? The former is node-local whereas the latter preserves the context across nodes.

Hi @David,

Thanks for the feedback! I switched to wrapping only the part that sets the header, and now it seems to work ok in my dummy plugin.

However, the problem arises in my actual plugin code where it throws the same "value for key already present" exception. I think it's because I'm doing network calls using CompletableFuture (which defaults to ForkJoinPool, so it's another threadpool, and it may be a problem :man_facepalming:t2:

I will try supplying the threadPool.executor to it and report back.

If Elasticsearch is complaining that you're trying to set a key twice then I suspect you are not properly processing each request in its own context. Each one needs a separate one of these:

    try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
        ...
    }

It's not easy to give much more help without seeing more code, but hopefully now you've got something working you can start to bisect your way to what you want.

Yes, perhaps you want to use Elasticsearch's thread pools. You should also be wary of CompletableFuture because it's easy to swallow fatal exceptions like AssertionError, StackOverflowError or OutOfMemoryError when using them. Elasticsearch provides safer alternatives like PlainActionFuture and PlainListenableActionFuture that are usually sufficient.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.