Nodes do not release RAM when idle

Hi all,

I have experienced some weird behavior on my development cluster while reading an index using the Elasticsearch-Hadoop connector. I am benchmarking the read performance, by running some scripts that read the full index while tracking some metrics. All seemed fine at first, but after a couple of tests, the read performance started to degrade and the results were getting less and less reproducible.

I am not sure what the issue is, but I noticed that 2 out of 3 nodes are showing 100% RAM usage all the time (so even when idle). It is also these nodes that show almost 0 read I/O. I am not sure what is causing this and how to resolve it.

Here is a screenshot of the cluster statistics when idle:

This is a screenshot of some statistics in one of the first runs on one of the nodes, with some read I/O:

This is a screenshot of some statistics the last run on the same node. You can see read I/O being zero:

Some information about the cluster:

  • Running on EKS backed by GP3 EBS
  • Total disk space for primary ~60GB
  • 3 nodes
  • 6 shards (we are currently reducing it to 3)
  • 1 replica
  • 2 CPU, 8GB RAM, 4GB heap per node

Thanks in advance!

RAM is not heap, so given your heap is not full this just looks like the OS caching files itself which is totally normal.

If this is not what you are expecting then it'd be good to understand what you are looking for.

Well actually the issue is that the benchmarking of reading the whole index in Spark using the Elasticsearch-Hadoop connector gave extremely inconsistent results for the same settings. I expect the total time to be more or less the same for both runs. Or even faster for the later run because of caching.

I tried to find an answer why this is the case and though maybe the RAM was too full. But since you say this looks totally normal, this probably is not the problem.

Let me explain a bit more about what I observed the first run (first screenshot), and the last run (second screenshot). Both runs are using the exact same settings of ES-Hadoop.

First run:

  • Steady search rate of ~3
  • Similar high CPU rate of ~75% on all 3 nodes.
  • Average read I/O of 250 on all 3 nodes.
  • The full read was done in ~18 minutes.

Last run:

  • Very bumpy search rate with an average of ~1
  • Low CPU of ~10% on all 3 nodes.
  • Very low read I/O on one node, and 0 read I/O on others
  • The full read took more than 30 minutes and then I stopped it.

This morning I restarted the cluster and ran the tests again, and I cannot reproduce the above things anymore. I am not sure what happened, but if you have any suggestions what could cause such situations that would be very helpful in my understanding.

Another update on the situation. I have been continuing the research by repeating the same experiment with the default configurations four times, and the same thing starts to happen again.

First experiment:

Second experiment:

Third experiment:

Fourth experiment:

Could you possibly provide some more detail here on the use case. Just a quick observation, that is an extremely high IOPS value compared to an extremely low search rate for the majority of use cases (I believe). For someone to help it would be useful to know:

  1. How are these nodes hosted? Self-hosted on-prem? Self-hosted Cloud (AWS/GCP/Azure)? Elastic Cloud?
  2. What type of disk is backing the nodes?
  3. Are these nodes only running Elasticsearch, or are there other processes also running on them?
  4. More detail about your use case(s). If possible, example queries that are being run as part of your use case.
  5. How are your indices setup? I see each node roughly uses ~35GB of disk space, is this just one index with 3 shards (1 shard per node), or some other usage setup?

Yeah I am happy to provide the extra information. Note that I am benchmarking by reading fields of ALL the data using the Elasticsearch-Hadoop connector using Spark. This is (I think) why the search rate is low, and the IOPS is high. Elasticsearch-Hadoop uses the scroll API to batch through the data, and read it into Spark.

Regarding the extra information:

  1. We are having 3 nodes hosted on EKS. Each node currently has (minimum 600m- max 2vCPU), 8GB RAM, 4GB Heap
  2. The storage backing the nodes are EBS GP3 SSD volumes, which are configurable in terms of IOPS and throughput.
  3. The 3 Elasticsearch nodes are 3 different pods running in EKS. However, there are multiple pods running on the same instance. I will be checking by increasing the resource requirements if it helps in this issue.
  4. What I am doing is reading the whole index (only relevant fields) from Elasticsearch, perform expensive computations on them in Spark, and writing some calculated values back to the same index. In this stage I am benchmarking the read speed on this cluster. The query that is being fired is scroll API using:
{"query": {"match_all": {}}}
  1. We have one main index having patent related data with some nested fields and texts included. This is the development cluster so this is a slice of the actual data we are using in production. This main index in the development cluster is ~60 GB in total. We have currently 3 primary shards, one on each node. The tests above were with 1 replica.

Let me know if you need some more information.

Given the use case, I'd recommend looking at a few things:

  1. See if you can use the point in time API, per the scroll API they seem to recommend it for large searches, which appears to be your case.
  2. See about adjusting the size parameter of the search. By default, it is 10, this might be pretty small for you use case, but if your docs are large, you might not want to go to the max 10000 as this could require more RAM.
  3. See about increasing the available system RAM. Elasticsearch is able to leverage the underlying OS/file system cacheing to speed up searching. If you're low on RAM/unable to cache much of the filesystem data, it will take longer (and more IOPS) to search the data directly from disk.

Thanks for the suggestions.

  1. The Elasticsearch-Hadoop connector for Spark uses scroll API. The point in time API the connector does not implement I think.
  2. I have been testing different sizes, and indeed this changes the speed to some extend. However, my main problem is that the performance degrades as I run the experiment more often. After running it the 4th time the performance is so poor that it just seems to hang.
  3. I have increased the RAM but that does not seem to help much. One of odd things I observe is that for the fourth run, there is hardly any IOPS as compared to the first run.

Any other thoughts?

Hmm:

One of odd things I observe is that for the fourth run, there is hardly any IOPS as compared to the first run.

This could actually be the filesystem cache working here.

I'd see if while you're running the tests, what the hot threads show. They're kind of annoying to read, but should show you what the system might be doing at a more detailed level.

The other question. While we've mainly been looking at the Elasticsearch side, have you looked at the Spark side to make sure there isn't anything going on on that end of things? It is weird to me that Elasticsearch is slowing down for no apparent reason. Curious if maybe the spark end isn't getting the same "cleanup" between tests or something similar that could be causing it to slow down what it can process.

Here is the output of the hot treads API. To be honest I am not sure how to interpret this.

::: {elasticsearch-0}{QsRAhPhqRtCEhrGxMT8YEQ}{SpsJ8cueTraOY17ixEFnDw}{elasticsearch-0.elasticsearch.patentfinder-development}{10.101.3.141:9300}{cdfhilmrstw}{xpack.installed=true, ml.max_jvm_size=4294967296, ml.machine_memory=10737418240}
   Hot threads at 2022-09-14T21:37:10.675Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:
   
   100.0% [cpu=1.0%, other=99.0%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[elasticsearch-0][transport_worker][T#1]'
     2/10 snapshots sharing following 194 elements
       org.elasticsearch.xpack.security.action.user.TransportHasPrivilegesAction.resolveApplicationPrivileges(TransportHasPrivilegesAction.java:106)
       org.elasticsearch.xpack.security.action.user.TransportHasPrivilegesAction.doExecute(TransportHasPrivilegesAction.java:87)
       org.elasticsearch.xpack.security.action.user.TransportHasPrivilegesAction.doExecute(TransportHasPrivilegesAction.java:38)
       app//org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:79)
       app//org.elasticsearch.action.support.ActionFilter$Simple.apply(ActionFilter.java:53)
       app//org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:77)
       io.siren.federate.session.SessionTrackerActionFilter.apply(SessionTrackerActionFilter.java:58)
       app//org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:77)
       io.siren.federate.connector.e.a.a(ConnectorFilter.java:141)
       io.siren.federate.connector.e.a$$Lambda$6571/0x0000000801c416f8.run(Unknown Source)
       io.siren.federate.connector.e.a.apply(ConnectorFilter.java:161)
       app//org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:77)
       org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$applyInternal$3(SecurityActionFilter.java:163)
       org.elasticsearch.xpack.security.action.filter.SecurityActionFilter$$Lambda$6569/0x0000000801e0b170.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$DelegatingFailureActionListener.onResponse(ActionListener.java:245)
       org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorizeAction$4(AuthorizationService.java:396)
       org.elasticsearch.xpack.security.authz.AuthorizationService$$Lambda$6507/0x0000000801e02498.accept(Unknown Source)
       org.elasticsearch.xpack.security.authz.AuthorizationService$AuthorizationResultListener.onResponse(AuthorizationService.java:965)
       org.elasticsearch.xpack.security.authz.AuthorizationService$AuthorizationResultListener.onResponse(AuthorizationService.java:929)
       app//org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:31)
       org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorizeAction$5(AuthorizationService.java:410)
       org.elasticsearch.xpack.security.authz.AuthorizationService$$Lambda$6509/0x0000000801e028d8.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       org.elasticsearch.xpack.security.authz.RBACEngine.authorizeClusterAction(RBACEngine.java:165)
       org.elasticsearch.xpack.security.authz.AuthorizationService.authorizeAction(AuthorizationService.java:400)
       org.elasticsearch.xpack.security.authz.AuthorizationService.maybeAuthorizeRunAs(AuthorizationService.java:376)
       org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorize$1(AuthorizationService.java:261)
       org.elasticsearch.xpack.security.authz.AuthorizationService$$Lambda$6445/0x0000000801d2f570.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       app//org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:31)
       org.elasticsearch.xpack.security.authz.RBACEngine.lambda$resolveAuthorizationInfo$0(RBACEngine.java:138)
       org.elasticsearch.xpack.security.authz.RBACEngine$$Lambda$6447/0x0000000801d2f9a0.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.lambda$getRoles$1(CompositeRolesStore.java:184)
       org.elasticsearch.xpack.security.authz.store.CompositeRolesStore$$Lambda$6451/0x0000000801d2fdd0.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       org.elasticsearch.xpack.core.security.authz.store.RoleReferenceIntersection.lambda$buildRole$0(RoleReferenceIntersection.java:47)
       org.elasticsearch.xpack.core.security.authz.store.RoleReferenceIntersection$$Lambda$6455/0x0000000801d3a088.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       app//org.elasticsearch.action.support.GroupedActionListener.onResponse(GroupedActionListener.java:55)
       org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.buildRoleFromRoleReference(CompositeRolesStore.java:232)
       org.elasticsearch.xpack.security.authz.store.CompositeRolesStore$$Lambda$6454/0x0000000801de0220.accept(Unknown Source)
       org.elasticsearch.xpack.core.security.authz.store.RoleReferenceIntersection.lambda$buildRole$1(RoleReferenceIntersection.java:50)
       org.elasticsearch.xpack.core.security.authz.store.RoleReferenceIntersection$$Lambda$6457/0x0000000801d3a4b8.accept(Unknown Source)
       java.base@18.0.1.1/java.lang.Iterable.forEach(Iterable.java:75)
       org.elasticsearch.xpack.core.security.authz.store.RoleReferenceIntersection.buildRole(RoleReferenceIntersection.java:50)
       org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.getRole(CompositeRolesStore.java:199)
       org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.getRoles(CompositeRolesStore.java:174)
       org.elasticsearch.xpack.security.authz.RBACEngine.resolveAuthorizationInfo(RBACEngine.java:135)
       org.elasticsearch.xpack.security.authz.AuthorizationService.authorize(AuthorizationService.java:263)
       org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$applyInternal$4(SecurityActionFilter.java:159)
       org.elasticsearch.xpack.security.action.filter.SecurityActionFilter$$Lambda$6567/0x0000000801e0ad40.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       app//org.elasticsearch.action.ActionListener$MappedActionListener.onResponse(ActionListener.java:127)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.authenticateAsync(AuthenticatorChain.java:93)
       org.elasticsearch.xpack.security.authc.AuthenticationService.authenticate(AuthenticationService.java:171)
       org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.applyInternal(SecurityActionFilter.java:155)
       org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:114)
       app//org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:77)
       app//org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:54)
       app//org.elasticsearch.tasks.TaskManager.registerAndExecute(TaskManager.java:170)
       app//org.elasticsearch.client.internal.node.NodeClient.executeLocally(NodeClient.java:113)
       app//org.elasticsearch.client.internal.node.NodeClient.doExecute(NodeClient.java:91)
       app//org.elasticsearch.client.internal.support.AbstractClient.execute(AbstractClient.java:380)
       app//org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:59)
       org.elasticsearch.xpack.security.rest.action.user.RestHasPrivilegesAction.lambda$innerPrepareRequest$1(RestHasPrivilegesAction.java:85)
       org.elasticsearch.xpack.security.rest.action.user.RestHasPrivilegesAction$$Lambda$7841/0x0000000801f7dd08.accept(Unknown Source)
       app//org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:103)
       org.elasticsearch.xpack.security.rest.SecurityRestFilter.lambda$handleRequest$0(SecurityRestFilter.java:112)
       org.elasticsearch.xpack.security.rest.SecurityRestFilter$$Lambda$7192/0x0000000801f4d0f0.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       org.elasticsearch.xpack.security.authc.support.SecondaryAuthenticator.lambda$authenticateAndAttachToContext$2(SecondaryAuthenticator.java:84)
       org.elasticsearch.xpack.security.authc.support.SecondaryAuthenticator$$Lambda$7195/0x0000000801f4d740.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       org.elasticsearch.xpack.security.authc.support.SecondaryAuthenticator.authenticate(SecondaryAuthenticator.java:94)
       org.elasticsearch.xpack.security.authc.support.SecondaryAuthenticator.authenticateAndAttachToContext(SecondaryAuthenticator.java:78)
       org.elasticsearch.xpack.security.rest.SecurityRestFilter.lambda$handleRequest$2(SecurityRestFilter.java:106)
       org.elasticsearch.xpack.security.rest.SecurityRestFilter$$Lambda$7164/0x0000000801f41650.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       app//org.elasticsearch.action.ActionListener$MappedActionListener.onResponse(ActionListener.java:127)
       app//org.elasticsearch.action.ActionListener$RunBeforeActionListener.onResponse(ActionListener.java:415)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.writeAuthToContext(AuthenticatorChain.java:361)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.finishAuthentication(AuthenticatorChain.java:337)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.maybeLookupRunAsUser(AuthenticatorChain.java:202)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.lambda$doAuthenticate$1(AuthenticatorChain.java:121)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain$$Lambda$7167/0x0000000801f41c90.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       org.elasticsearch.xpack.core.common.IteratingActionListener.onResponse(IteratingActionListener.java:141)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.lambda$getAuthenticatorConsumer$4(AuthenticatorChain.java:188)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain$$Lambda$7172/0x0000000801f42738.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       org.elasticsearch.xpack.security.authc.RealmsAuthenticator.lambda$consumeToken$3(RealmsAuthenticator.java:217)
       org.elasticsearch.xpack.security.authc.RealmsAuthenticator$$Lambda$7174/0x0000000801f42b70.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       app//org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:31)
       org.elasticsearch.xpack.core.common.IteratingActionListener.onResponse(IteratingActionListener.java:141)
       org.elasticsearch.xpack.security.authc.RealmsAuthenticator.lambda$consumeToken$0(RealmsAuthenticator.java:162)
       org.elasticsearch.xpack.security.authc.RealmsAuthenticator$$Lambda$7177/0x0000000801f42fa0.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRealm.lambda$authenticateWithCache$1(CachingUsernamePasswordRealm.java:155)
       org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRealm$$Lambda$7406/0x0000000801f7c648.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRealm.handleCachedAuthentication(CachingUsernamePasswordRealm.java:242)
       org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRealm.lambda$authenticateWithCache$2(CachingUsernamePasswordRealm.java:139)
       org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRealm$$Lambda$7404/0x0000000801f7c218.accept(Unknown Source)
       app//org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:162)
       app//org.elasticsearch.common.util.concurrent.ListenableFuture.notifyListenerDirectly(ListenableFuture.java:113)
       app//org.elasticsearch.common.util.concurrent.ListenableFuture.addListener(ListenableFuture.java:55)
       org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRealm.authenticateWithCache(CachingUsernamePasswordRealm.java:134)
       org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRealm.authenticate(CachingUsernamePasswordRealm.java:105)
       org.elasticsearch.xpack.security.authc.RealmsAuthenticator.lambda$consumeToken$2(RealmsAuthenticator.java:146)
       org.elasticsearch.xpack.security.authc.RealmsAuthenticator$$Lambda$7173/0x0000000801f42948.accept(Unknown Source)
       org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:117)
       org.elasticsearch.xpack.security.authc.RealmsAuthenticator.consumeToken(RealmsAuthenticator.java:233)
       org.elasticsearch.xpack.security.authc.RealmsAuthenticator.authenticate(RealmsAuthenticator.java:82)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.lambda$getAuthenticatorConsumer$5(AuthenticatorChain.java:180)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain$$Lambda$7169/0x0000000801f420c0.accept(Unknown Source)
       org.elasticsearch.xpack.core.common.IteratingActionListener.onResponse(IteratingActionListener.java:135)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.lambda$getAuthenticatorConsumer$5(AuthenticatorChain.java:158)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain$$Lambda$7169/0x0000000801f420c0.accept(Unknown Source)
       org.elasticsearch.xpack.core.common.IteratingActionListener.onResponse(IteratingActionListener.java:135)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.lambda$getAuthenticatorConsumer$5(AuthenticatorChain.java:158)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain$$Lambda$7169/0x0000000801f420c0.accept(Unknown Source)
       org.elasticsearch.xpack.core.common.IteratingActionListener.onResponse(IteratingActionListener.java:135)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.lambda$getAuthenticatorConsumer$5(AuthenticatorChain.java:158)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain$$Lambda$7169/0x0000000801f420c0.accept(Unknown Source)
       org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:117)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.doAuthenticate(AuthenticatorChain.java:136)
       org.elasticsearch.xpack.security.authc.AuthenticatorChain.authenticateAsync(AuthenticatorChain.java:95)
       org.elasticsearch.xpack.security.authc.AuthenticationService.authenticate(AuthenticationService.java:149)
       org.elasticsearch.xpack.security.authc.AuthenticationService.authenticate(AuthenticationService.java:127)
       org.elasticsearch.xpack.security.rest.SecurityRestFilter.handleRequest(SecurityRestFilter.java:100)
       app//org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:390)
       app//org.elasticsearch.rest.RestController.tryAllHandlers(RestController.java:469)
       app//org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:305)
       app//org.elasticsearch.http.AbstractHttpServerTransport.dispatchRequest(AbstractHttpServerTransport.java:384)
       app//org.elasticsearch.http.AbstractHttpServerTransport.handleIncomingRequest(AbstractHttpServerTransport.java:463)
       app//org.elasticsearch.http.AbstractHttpServerTransport.incomingRequest(AbstractHttpServerTransport.java:358)
       org.elasticsearch.http.netty4.Netty4HttpRequestHandler.channelRead0(Netty4HttpRequestHandler.java:35)
       org.elasticsearch.http.netty4.Netty4HttpRequestHandler.channelRead0(Netty4HttpRequestHandler.java:19)
       io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       org.elasticsearch.http.netty4.Netty4HttpPipeliningHandler.channelRead(Netty4HttpPipeliningHandler.java:48)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
       io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
       io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
       io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
       io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
       io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
       io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
       io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
       io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
       io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
       io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
       io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:623)
       io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:586)
       io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
       io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
       io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
       java.base@18.0.1.1/java.lang.Thread.run(Thread.java:833)
     2/10 snapshots sharing following 9 elements
       java.base@18.0.1.1/sun.nio.ch.EPoll.wait(Native Method)
       java.base@18.0.1.1/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:118)
       java.base@18.0.1.1/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:129)
       java.base@18.0.1.1/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:146)
       io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:813)
       io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
       io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
       io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
       java.base@18.0.1.1/java.lang.Thread.run(Thread.java:833)
   
   100.0% [cpu=0.7%, other=99.3%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[elasticsearch-0][transport_worker][T#4]'
     2/10 snapshots sharing following 3 elements
       io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
       io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
       java.base@18.0.1.1/java.lang.Thread.run(Thread.java:833)

::: {elasticsearch-1}{OSuHW8nNRj6dftPNKI2_1A}{N9HT6iKjQ6GOopZuoWeNow}{elasticsearch-1.elasticsearch.patentfinder-development}{10.101.5.76:9300}{cdfhilmrstw}{ml.machine_memory=10737418240, ml.max_jvm_size=4294967296, xpack.installed=true}
   Hot threads at 2022-09-14T21:37:10.675Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:
   
   100.0% [cpu=1.9%, other=98.1%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[elasticsearch-1][transport_worker][T#5]'
     2/10 snapshots sharing following 3 elements
       io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
       io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
       java.base@18.0.1.1/java.lang.Thread.run(Thread.java:833)
   
   100.0% [cpu=0.5%, other=99.5%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[elasticsearch-1][transport_worker][T#1]'
     2/10 snapshots sharing following 3 elements
       io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
       io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
       java.base@18.0.1.1/java.lang.Thread.run(Thread.java:833)

::: {elasticsearch-2}{qkWa40aOTK216A78fozIpA}{MNsepxpUQIaa1K4oQAsd1w}{elasticsearch-2.elasticsearch.patentfinder-development}{10.101.4.179:9300}{cdfhilmrstw}{ml.machine_memory=10737418240, xpack.installed=true, ml.max_jvm_size=4294967296}
   Hot threads at 2022-09-14T21:37:10.675Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

Regarding your comment that the problem could be on the Spark/Elasticsearch-Hadoop side. I think this is unlikely, but I think it is indeed good to rule this out completely. Tomorrow morning I will write some code to use the scroll API to fetch all data using the normal API to see if I can reproduce this error also without Spark.

Oh, and regarding this:

This could actually be the filesystem cache working here.

I was thinking the same. However, I would expect the job to be finished earlier. Or could it somehow be that as RAM fills up with filesystem cache (all nodes are near 100%), that somehow the process gets slower?

Also, another thing to mention here is that when I restart the cluster, the problems persist. However, when I leave the cluster untouched overnight, and start the job again in the morning, everything seems to be working great again.

No, it shouldn't. The memory occupied by the file system cache will be available and freed up if needed. This sounds quite normal.

I have seen this type of behaviour when resources like CPU cycles and IOPS are tied to credits and not necessarily consistently available. EBS GP3 SSD volumes should as far as I know have a fixed floor and not be subject to this. Is it possible CPU allocation might be metered and subject to throttling?

I have double checked if anything can be limiting it. However, our cluster is running on Kubernetes, and we have assigned 4-6 vCPU to each of the pods. Also the underlying EC2 instances are of type *.2xlarge, which don't have CPU credits.

Would it somehow be possible that the threadpool gets exhausted somehow after some time, and/or maybe some processes are keeping on running? I am trying to understand the hot_thread API. I notice on each of the nodes null_Worker-* threads. I am not sure what this is, but I haven't been seeing that before.

@Christian_Dahlqvist @BenB196

Another thing we observed which might be of interest. If we examine the GET /_nodes/hot_threads?threads=999 during the initial runs, we can actually see some [search] threads consuming CPU as expected. However, as we repeat the experiment a couple of times, the performance degrades and the total time increases a lot. If we examine the GET /_nodes/hot_threads?threads=999, we don't see any [search] threads anymore. If I request also the idle threads by GET /_nodes/hot_threads?threads=999&ignore_idle_threads=false, I can see a lot of [search] threads like this:

0.0% [cpu=0.0%, other=0.0%] (0s out of 500ms) cpu usage by thread 'elasticsearch[elasticsearch-0][search][T#8]'
     10/10 snapshots sharing following 13 elements
       java.base@18.0.1.1/jdk.internal.misc.Unsafe.park(Native Method)
       java.base@18.0.1.1/java.util.concurrent.locks.LockSupport.park(LockSupport.java:341)
       java.base@18.0.1.1/java.util.concurrent.LinkedTransferQueue$Node.block(LinkedTransferQueue.java:470)
       java.base@18.0.1.1/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3464)
       java.base@18.0.1.1/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3435)
       java.base@18.0.1.1/java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:669)
       java.base@18.0.1.1/java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:616)
       java.base@18.0.1.1/java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1286)
       app//org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:152)
       java.base@18.0.1.1/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1062)
       java.base@18.0.1.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1122)
       java.base@18.0.1.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
       java.base@18.0.1.1/java.lang.Thread.run(Thread.java:833)

That's exactly what an idle thread looks like :slight_smile:

Given the low CPU usage, low IO traffic, and lack of any active threads, it does rather look like Elasticsearch is just waiting to be given some work to do and the bottleneck is elsewhere.

You could try using the REST request tracer to log every single request and the corresponding response. That would give a good indication whether the slowness is within Elasticsearch or without.

@DavidTurner Thanks for the suggestion! We examined the requests and indeed found out that response times within ES are quick for the scroll API. We found that requests, however, are coming in too slow. Somehow something is limiting the rate at which API calls to ES can be made. We are not sure yet what it is, but we are examining the different components of the infrastructure. Will update if I know more.

1 Like

Alright, finally we have tracked down the issue. I will quickly summarize all things discussed here for people that are having similar issues.

  • We were seeing extreme performance degradation when reading a full index all at once using Spark (Elasticsearch-Hadoop connector). At first all seems good and performance is good. After a while search rate start to drop and there is hardly any evidence of read I/O.
  • We were seeing high RAM usage on our ES cluster. This is NOT an issue at all. (@warkolm @Christian_Dahlqvist)
  • Changing read parameters such as es.scroll.size and es.input.max.docs.per.partition did change search rates, but did NOT mitigate any overall performance degradation.
  • We were observing that the read performance using Spark came back after a while of the cluster being idle.
  • We were able to confirm that the problem was not on Elasticsearch side by using the REST request tracer (@DavidTurner). This showed that request-response times of Elasticsearch were great.
  • We eventually found that our ingress controller of our Kubernetes cluster was underperforming, causing a bottleneck. Probably this is caused by Spark opening a lot of connections, but this we need to verify.

Thanks for the help everyone!

5 Likes