Elasticsearch transport client spawning a large number of threads


(Ash S) #1

From an Apache storm bolt, I am using Elasticsearch's transport client to remotely connect to an ES cluster. When I take a jstack output of the storm process, I notice that there are nearly 1000 threads with an ES stack trace like:

elasticsearch[Flying Tiger][transport_client_worker][T#22]{New I/O worker #269}" daemon prio=10 tid=0x00007f80ac3cb000 nid=0x356b runnable [0x00007f7e96b2a000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked <0x00000000d148d138> (a sun.nio.ch.Util$2) - locked <0x00000000d148d128> (a java.util.Collections$UnmodifiableSet) - locked <0x00000000d148c9b8> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.elasticsearch.common.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68) at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:415) at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212) at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

I am using a single instance of the ES transport client across my storm topology, which has about 18 output streams invoking the ES client to write data to the ES cluster.

Why does the ES transport client spawn so many threads ? Is there any way I can tune this ? I tried looking up ES documentation but it does not provide any internal details on the threading mechanism of the transport client nor does it give an option to tune the number of threads of the client.


(Ilija Subasic) #2

Hi Ash,
I have observed a similar problem with my setup. Did you ever find out the source of it (and possibly a fix).
thanks,
ilija


(Jason Wee) #3

the search threadpool if i understand correctly, it is unbounded, there is a setting where you can pass to the construction of the transport client. something like

settings = ImmutableSettings.settingsBuilder()
            .put("cluster.name", "superMegaCluster")
            .put("network.tcp.connect_timeout", 10000)
            .put("client.transport.nodes_sampler_interval", 10000)
            .put("client.transport.sniff", true)
            .put("threadpool.search.type", "fixed")
            .put("threadpool.search.size", 8)
            .put("threadpool.search.queue_size", 100)
            .build();
    TransportClient tc = new TransportClient(settings);

hth


(Ilija Subasic) #4

Hi,
Isn't it the case that if the queue_size is reached no more requests are being processed?


(Jörg Prante) #5

There is a misconception. Threadpools in TransportClient are not used for search actions. The threadpool in TransportClient manages the network connections to other nodes and the returning of response objects to consumer threads in an application.

It is often the case that threads in TransportClient are waiting for outstanding actions on other nodes in the cluster but the nodes could not answer. You can check this be examining a thread dump for the threads that are in BLOCKED state.

Another possibility for high thread count is when a TransportClient is used to connect to dozens or even hundreds of nodes. Each node may take 20-30 connections when very active, this is also dependent on the number of CPU cores. Once activated, threads keep on existing for a certain amount of time, waiting for more actions to execute.


(Ilija Subasic) #6

Thank you @jprante. This is really useful. Do you know what happens when the thread queue reaches its limit (on the nodes). Are the incoming request disregarded or is there some timeout (on the client) waiting until a thread is emptied. We profiled our application and seen that 35% of time on the client is spend on sun.nio.ch.SelectorImpl.select().

So we are think of increasing the search thread count and the queue thread as a possible solution. Our queries are not long and take around 200ms, but we have several millions over a day (2 powerful nodes).


(Jason Wee) #7

the transport client node or the elasticsearch node which hang?


(Jörg Prante) #8

You will see EsRejectedExecutionException in the node logs. See also https://github.com/elastic/elasticsearch/blob/1.7/src/test/java/org/elasticsearch/action/RejectionActionTests.java where tiny queues are tested how they react (in ES 1.7)

Spending 35% time in sun.nio.ch.SelectorImpl.select() is fine, since this is the JDK code managing the network socket communication protocol. There can be network latency though, which must be checked on operating system level / network administration level, this is outside of the scope of Elasticsearch.

Increasing search thread count / queue is not a viable solution. The default setting is fine, because it already can exercise all CPU resources available. If search queues fill up, it mostly means your overall cluster is too slow for the workload pattern. There can be lots of reasons for slowness.


(Jörg Prante) #9

Both. If transport client has threads in BLOCKED state, it waits for responses that are not coming back. On a node, it means, other nodes did not respond, in the inter-node communication (routing requests to other nodes).


(Ilija Subasic) #10

Thank you a lot @jprante! We are looking into what is the bottle neck. So far the ad-hoc hot_thread analysis shows that getting term_vectors (which we need for some computations) is taking around 50-60% of actual time ES is working. We are keeping only term frequencies. Do you know if there is a more efficient way to save/retrieve term vectors then default es way?

Thank you again.


(Jason Wee) #11

thank you. that's very helpful


#12

Looks to me, from reading the code, that the future listeners are executed using the thread pool, at least with the version I'm using (1.5).


(system) #13