A thread limit for searches in the java client

Hi,
I've just recently upgraded to ES 5.2.1 from 2.3.1.
when I run the server on production, there seems to be some kind of "limit" (probably threads) after a few consecutive search requests..
the requests seem to be waiting for something for over a minute..
I'm using the PreBuiltTransportClient with the following settings:
settings.put("client.transport.sniff", true );
settings.put("client.transport.ignore_cluster_name", true);
settings.put("client.transport.ping_timeout", "10s");
settings.put("client.transport.nodes_sampler_interval", "10s");

can somebody suggest any settings i should change?

You should not see those limits and you should not change threadpool settings.

Can you share a sample code of what you are doing and the error message you are getting if any?

the code is something like this:

final PreBuiltTransportClient transportclient = new PreBuiltTransportClient(settings);

for (final String ip : clusterIPsSplit) {
	final String[] ipAndPort = ip.split(IP_PORT_DELIMITER);
	transportclient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ipAndPort[0]), Integer.valueOf(ipAndPort[1])));
}

QueryBuilder query = ...;
QueryBuilder qb = QueryBuilders.nestedQuery(NESTED_DOC, query, ScoreMode.Total);

final SearchRequestBuilder searchReqBuilder = _client
				.prepareSearch("...")
				.setTypes("...")
				.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
				.setQuery(qb)
				.setFrom(startIndex)
				.setSize(size)
				.setExplain(false);

private <T> Observable<SearchResponse> observeSearchResult(final ActionRequestBuilder searchReqBuilder) {
	return Observable.create(subscriber -> {
		try {
			searchReqBuilder.execute().addListener(new ActionListener<SearchResponse>() {
				@Override
				public void onResponse(final SearchResponse response) {
					if (!subscriber.isUnsubscribed()) {
						subscriber.onNext(response);
						subscriber.onCompleted();
					}
				}

				@Override
				public void onFailure(final Exception e) {
					if (!subscriber.isUnsubscribed()) {
						subscriber.onError(e);
					}
				}
			});
		}
		catch (final Exception e) {
			if (!subscriber.isUnsubscribed()) {
				subscriber.onError(e);
			}
		}
	});
}

the only "migrated" part is using the PreBuiltTransportClient.

maybe a thread dump can help you somehow?

i see 8 elasticsearch threads like this:
"elasticsearch[client][generic][T#67]" #179 daemon prio=5 os_prio=0 tid=0x00007f518c102000 nid=0x1c07 waiting on condition [0x00007f5151dad000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec3bba98> (a org.elasticsearch.common.util.concurrent.EsExecutors$ExecutorScalingQueue)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:734)
at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
at java.util.concurrent.LinkedTransferQueue.poll(LinkedTransferQueue.java:1273)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

this one:
"HttpClient@1269712271-scheduler" #114 prio=5 os_prio=0 tid=0x00000000017c8800 nid=0x74a7 waiting on condition [0x00007f5155eec000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec2f2a40> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

a few like this:
"nioEventLoopGroup-5-10" #113 prio=10 os_prio=0 tid=0x00000000017c3800 nid=0x7429 runnable [0x00007f51565ed000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000edab3528> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000edb6c280> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000edab34e0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:731)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:391)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)

a few like this:
"defaultEventExecutorGroup-9-5" #108 prio=5 os_prio=0 tid=0x00000000017bb000 nid=0x73ff waiting on condition [0x00007f5156cf2000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ef6dd198> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor.takeTask(SingleThreadEventExecutor.java:252)
at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:64)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)

this one:
"elasticsearch[client][listener][T#1]" #92 daemon prio=5 os_prio=0 tid=0x00007f5174545000 nid=0x73db waiting on condition [0x00007f5157f00000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec3b3568> (a java.util.concurrent.LinkedTransferQueue)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737)
at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1265)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

4 threads like this one:
"elasticsearch[client][management][T#5]" #50 daemon prio=5 os_prio=0 tid=0x00007f51a14f5000 nid=0x73b1 waiting on condition [0x00007f51642e1000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec3bc7b8> (a org.elasticsearch.common.util.concurrent.EsExecutors$ExecutorScalingQueue)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:734)
at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
at java.util.concurrent.LinkedTransferQueue.poll(LinkedTransferQueue.java:1273)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

these 2 boss threads:
"elasticsearch[client][transport_client_boss][T#1]" #41 daemon prio=5 os_prio=0 tid=0x00007f5168002800 nid=0x73a8 runnable [0x00007f5166e28000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000ec525b48> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x00000000ec525b68> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000ec525b00> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:731)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:391)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at java.lang.Thread.run(Thread.java:745)

and these:
"elasticsearch[client][management][T#1]" #40 daemon prio=5 os_prio=0 tid=0x00007f51a14e8800 nid=0x73a7 waiting on condition [0x00007f5166f29000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec3bc7b8> (a org.elasticsearch.common.util.concurrent.EsExecutors$ExecutorScalingQueue)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:734)
at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
at java.util.concurrent.LinkedTransferQueue.poll(LinkedTransferQueue.java:1273)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"elasticsearch[_client_][scheduler][T#1]" #39 daemon prio=5 os_prio=0 tid=0x00007f51a1465000 nid=0x73a6 waiting on condition [0x00007f5166cc8000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000000ec3bd448> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

"elasticsearch[_client_][[timer]]" #37 daemon prio=5 os_prio=0 tid=0x00007f51a1026000 nid=0x7379 waiting on condition [0x00007f51679fa000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at org.elasticsearch.threadpool.ThreadPool$EstimatedTimeThread.run(ThreadPool.java:498)

I believe these are all the relevant threads... tell me if i missed any

I do not get any error message, a few seconds after I create the observable the connection is usually being closed due to client timeout.. or the request is being executed after a few seconds and then i get the result but it takes time..
also, i believe the problem is with the client not sending the request to the servers, I see no metrics that indicate any problem, no slow search log on the servers.. curling on the server seems fine...
it seems like there is some kind of a problem with the thread pool

another indicator that might help... It takes a "while" from the server restart till the problem starts to happen...
the amount of requests is pretty much irrelevant

another thing i found by running: netstat -na | grep 9300
is that the send queue buffer is not being flushed to the ES machines...
tcp6 0 49880 CURRENT_MACHINE:49094 ELASTICSEARCH_MACHINE:9300 ESTABLISHED

for comparison, the application using the transport client 2.3.1 never reaches a value larger than 1000 on the send queue... maybe a missing flush somewhere?
we've decrease the MTU to see if it helps to resolve the problem

decreasing the MTU seems to have resolved the problem

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.