Hi Team,
We are getting "Connection timed out " while connecting to elastic cluster [setup on GCP] through resthighlevel client from spark mapPartitions() [using dataproc].
We are using Elasticsearch for lookup; basically we perform search query on each row from rdd/dataset; once we have response we further do some cleansing process.
Error:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, sample-elastic-search-20220331115404-hrsqxn35nhabk-w-1.c.msampledev.internal, executor 1): ElasticsearchException[java.util.concurrent.ExecutionException: java.net.ConnectException: Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]]; nested: ExecutionException[java.net.ConnectException: Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]]; nested: ConnectException[Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]];
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2695)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
at com.test.sample.TestClass.query(TestClass.java:199)
at com.test.sample.TestClass.access$000(TestClass.java:41)
at com.test.sample.TestClass$3.call(TestClass.java:155)
at org.apache.spark.sql.Dataset$$anonfun$43.apply(Dataset.scala:2666)
at org.apache.spark.sql.Dataset$$anonfun$43.apply(Dataset.scala:2666)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:188)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:185)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.net.ConnectException: Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]
at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:257)
at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:244)
at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:75)
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2692)
... 36 more
Caused by: java.net.ConnectException: Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]
at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:169)
at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:628)
at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:894)
at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:184)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:214)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:158)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
... 1 more
Language: java 1.8
Spark version: 2.4.6
Elastic version [on cloud]: 7.13.2
Dependency added :
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
Would be grateful to have suggestion to fix this ASAP?
Note: we are not using spark Elasticsearch connector