The Elasticsearch Cluster with the reported issue includes 86 data nodes and 3 dedicated master nodes, each with 32 GB heap 64 to 128GB OS memory, and between 16 ~ 32 CPU cores per node.
We have two ES clusters with similar setup and same data, we only observe issue in one of the cluster.
Here is the detail of a sample error log on the client side when the issue occur, es-hadoop client library is used.
[2024-06-05 20:48:20,417] ERROR Node [10.246.13.230:9200] failed (java.net.SocketTimeoutException: Read timed out); selected next node [10.246.12.178:9200] (org.elasticsearch.hadoop.rest.NetworkClient)
[2024-06-05 21:07:40,373] ERROR Exception in task 9.0 in stage 551.0 (TID 49607) (org.apache.spark.executor.Executor)
org.elasticsearch.hadoop.EsHadoopException: Could not get a Transport from the Transport Pool for host [10.246.13.229:9200]
at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:110) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.create(PooledHttpTransportFactory.java:56) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.NetworkClient.selectNextNode(NetworkClient.java:99) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:82) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.NetworkClient.<init>(NetworkClient.java:58) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.RestClient.<init>(RestClient.java:110) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:154) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:587) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:71) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.spark.rdd.EsSpark$.$anonfun$doSaveToEs$1(EsSpark.scala:108) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.spark.rdd.EsSpark$.$anonfun$doSaveToEs$1$adapted(EsSpark.scala:108) ~[spark-job-1.1.35.jar:?]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.0.jar:3.3.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: org.elasticsearch.hadoop.EsHadoopException: java.net.SocketTimeoutException: Read timed out
at org.elasticsearch.hadoop.mr.security.HadoopUser.doAs(HadoopUser.java:68) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:681) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.pooling.TransportPool.validate(TransportPool.java:102) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.pooling.TransportPool.borrowTransport(TransportPool.java:131) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:107) ~[spark-job-1.1.35.jar:?]
... 18 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) ~[?:?]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:115) ~[?:?]
at java.net.SocketInputStream.read(SocketInputStream.java:168) ~[?:?]
at java.net.SocketInputStream.read(SocketInputStream.java:140) ~[?:?]
at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:484) ~[?:?]
at sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:478) ~[?:?]
at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70) ~[?:?]
at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1459) ~[?:?]
at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:1070) ~[?:?]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:252) ~[?:?]
at java.io.BufferedInputStream.read(BufferedInputStream.java:271) ~[?:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:77) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:105) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1115) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1832) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1590) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:995) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:397) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:170) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:396) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:324) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.doExecute(CommonsHttpTransport.java:717) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.access$200(CommonsHttpTransport.java:98) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport$2.run(CommonsHttpTransport.java:684) ~[spark-job-1.1.35.jar:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at javax.security.auth.Subject.doAs(Subject.java:423) ~[?:?]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) ~[hadoop-client-api-3.3.3.jar:?]
at org.elasticsearch.hadoop.mr.security.HadoopUser.doAs(HadoopUser.java:66) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:681) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.pooling.TransportPool.validate(TransportPool.java:102) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.pooling.TransportPool.borrowTransport(TransportPool.java:131) ~[spark-job-1.1.35.jar:?]
at org.elasticsearch.hadoop.rest.pooling.PooledHttpTransportFactory.borrowFrom(PooledHttpTransportFactory.java:107) ~[spark-job-1.1.35.jar:?]
Checking a few of the servers that on the list of servers got timed out, the interesting finding is that the ES nodes are not very busy. Heap pressure is between 50% and 70% and CPU usage is below 30%.
Checking the task list out, there are some of the actions took very long time to complete.
From the task list, we saw high running_time of certain task actions like the following.
- indices:admin/seq_no/global_checkpoint_sync
- cluster:monitor/nodes/info
"U6wDSlMoTF2ipBgvkNPklQ:3186347861" : {
"node" : "U6wDSlMoTF2ipBgvkNPklQ",
"id" : 3186347861,
"type" : "transport",
"action" : "cluster:monitor/nodes/info",
"description" : "",
"start_time" : "2024-06-06T14:06:20.130Z",
"start_time_in_millis" : 1717682780130,
"running_time" : "8.3m",
"running_time_in_nanos" : 500487977174,
"cancellable" : false,
"headers" : {
"X-Opaque-Id" : "[spark] [root] [dummy_spark_job_name]"
}
}
"U6wDSlMoTF2ipBgvkNPklQ:3186411368" : {
"node" : "U6wDSlMoTF2ipBgvkNPklQ",
"id" : 3186411368,
"type" : "direct",
"action" : "indices:admin/seq_no/global_checkpoint_sync[p]",
"status" : {
"phase" : "primary"
},
"description" : "GlobalCheckpointSyncAction.Request{shardId=[fdr_6_t2_2024-05-25-000002][10], timeout=1m, index='fdr_6_t2_2024-05-25-000002', waitForActiveShards=1}",
"start_time" : "2024-06-06T14:10:32.443Z",
"start_time_in_millis" : 1717683032443,
"running_time" : "4.1m",
"running_time_in_nanos" : 248174192644,
"cancellable" : false,
"parent_task_id" : "U6wDSlMoTF2ipBgvkNPklQ:3186411367",
"headers" : {
"X-Opaque-Id" : "[spark] [root] [dummy_spark_job_name]"
}
},
In the server log, noticed the logs for slow transmitting the data to the remote host for the node info request, which seems to be related to the long running_time in the task list.
[2024-06-06T14:21:04,058][WARN ][o.e.t.OutboundHandler ] [datanode-es-jetstream-prd1-sbq8.us-east1-c.c.spcstg1-sedjetstream.internal]sending transport message [Request{cluster:monitor/nodes/info[n]}{` `2326967899` `}{` `false` `}{` `false` `}{` `false` `}] of size [` `23512` `] on [Netty4TcpChannel{localAddress=/` `10.246` `.` `13.39` `:` `37870` `, remoteAddress=` `10.246` `.` `12.140` `/` `10.246` `.` `12.140` `:` `9300` `, profile=` `default` `}] took [898985ms] which is above the warn threshold of [5000ms] with success [` `false` `]`
`[2024-06-06T14:21:04,058][WARN ][o.e.t.OutboundHandler ] [datanode-es-jetstream-prd1-sbq8.us-east1-c.c.spcstg1-sedjetstream.internal]sending transport message [Request{cluster:monitor/nodes/info[n]}{` `2326968166` `}{` `false` `}{` `false` `}{` `false` `}] of size [` `23509` `] on [Netty4TcpChannel{localAddress=/` `10.246` `.` `13.39` `:` `37870` `, remoteAddress=` `10.246` `.` `12.140` `/` `10.246` `.` `12.140` `:` `9300` `, profile=` `default` `}] took [893982ms] which is above the warn threshold of [5000ms] with success [` `false` `]`
From the hot thread dump, we could see node info was processed on the transport worker thread which possibly slows down all network operation occurring on the node?
Could this explain the SocketTimeout exception for the bulk indexing request to the same node?
100.0% [cpu=2.4%, other=97.6%] (500ms out of 500ms) cpu usage by thread 'elasticsearch[datanode-es-jetstream-prd1-gwc6.us-east1-c.c.spcstg1-sedjetstream.internal][transport_worker][T#8]'
unique snapshot
java.base@18/java.lang.String.intern(Native Method)
app//org.elasticsearch.cluster.node.DiscoveryNode.<init>(DiscoveryNode.java:305)
app//org.elasticsearch.action.support.nodes.BaseNodesRequest$$Lambda$6155/0x0000000801aa7548.read(Unknown Source)
app//org.elasticsearch.common.io.stream.StreamInput.readArray(StreamInput.java:1023)
app//org.elasticsearch.common.io.stream.StreamInput.readOptionalArray(StreamInput.java:1029)
app//org.elasticsearch.action.support.nodes.BaseNodesRequest.<init>(BaseNodesRequest.java:46)
app//org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest.<init>(NodesInfoRequest.java:38)
app//org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction$NodeInfoRequest.<init>(TransportNodesInfoAction.java:102)
app//org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction$$Lambda$5015/0x00000008017fe080.read(Unknown Source)
app//org.elasticsearch.transport.RequestHandlerRegistry.newRequest(RequestHandlerRegistry.java:54)
app//org.elasticsearch.transport.InboundHandler.handleRequest(InboundHandler.java:229)
app//org.elasticsearch.transport.InboundHandler.messageReceived(InboundHandler.java:106)
app//org.elasticsearch.transport.InboundHandler.inboundMessage(InboundHandler.java:88)
app//org.elasticsearch.transport.TcpTransport.inboundMessage(TcpTransport.java:743)
org.elasticsearch.transport.netty4.Netty4MessageChannelHandler$$Lambda$5916/0x00000008019ff178.accept(Unknown Source)
app//org.elasticsearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:147)
app//org.elasticsearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:119)
app//org.elasticsearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:84)
org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:71)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1374)
io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1237)
io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1286)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:620)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:583)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
java.base@18/java.lang.Thread.run(Thread.java:833)
What we have also checked
-
We ensure that there is no expensive queries running when the issue occurred.
-
We checked that no snapshotting or data restoring occurred
-
The disk usage of the node affected was less than 60%
-
Each node in the cluster has about 200 open network connections
-
We were suspecting there could be a network problem but not sure what metrics could be checked