Connection timeout when resthighlevel client is called from spark mapPartitions method

Hi Team,

We are getting "Connection timed out " while connecting to elastic cluster [setup on GCP] through resthighlevel client from spark mapPartitions() [using dataproc].

We are using Elasticsearch for lookup; basically we perform search query on each row from rdd/dataset; once we have response we further do some cleansing process.

Error:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, sample-elastic-search-20220331115404-hrsqxn35nhabk-w-1.c.msampledev.internal, executor 1): ElasticsearchException[java.util.concurrent.ExecutionException: java.net.ConnectException: Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]]; nested: ExecutionException[java.net.ConnectException: Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]]; nested: ConnectException[Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]];
	at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2695)
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
	at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
	at com.test.sample.TestClass.query(TestClass.java:199)
	at com.test.sample.TestClass.access$000(TestClass.java:41)
	at com.test.sample.TestClass$3.call(TestClass.java:155)
	at org.apache.spark.sql.Dataset$$anonfun$43.apply(Dataset.scala:2666)
	at org.apache.spark.sql.Dataset$$anonfun$43.apply(Dataset.scala:2666)
	at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:188)
	at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:185)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.net.ConnectException: Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]
	at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:257)
	at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:244)
	at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:75)
	at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2692)
	... 36 more
Caused by: java.net.ConnectException: Timeout connecting to [sampletest-cluster.es.us-central1.gcp.cloud.es.io/3X.XXX.XXX.XX:9243]
	at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:169)
	at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:628)
	at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:894)
	at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:184)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:214)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:158)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
	... 1 more

Language: java 1.8

Spark version: 2.4.6

Elastic version [on cloud]: 7.13.2

Dependency added :

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.17.0</version>
</dependency>	

<dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.12.3</version>
 </dependency>

Would be grateful to have suggestion to fix this ASAP?

Note: we are not using spark Elasticsearch connector

Have you tried using curl to connect from each of your Spark nodes to the Elasticsearch cluster? Is it just that the Spark nodes can't reach the Elasticsearch nodes on port 9243?

1 Like

Didnt used curl commands to test ; but have checked like if we do connection outside mapPartitions() and put sample request we get a response. If we do the same inside mapPartitions() it gives connection time out.

We use dataproc to submit spark ; since code in the driver gets response; when we do the same inside mapPartitions() it has to yield response.

Also we have referred this:

Declaring RestHighLevelClient as private , would lead to serialization issue; so we declared it as transient.

You might have serialization problems, but I do not think that is what is causing a timeout when trying to connect to Elasticsearch. You would see a serialization-related stack trace for a serialization problem. The timeout is much more likely due to Elasticsearch not running on port 9243 on 3X.XXX.XXX.XX, or a firewall rule blocking you. I would check that each spark node is able to connect to Elasticsearch on that port. Once you have checked that you can connect to Elasticsearch with something like curl, and assuming you are running with security enabled (over https) then it's probably also worth checking that you have that configured properly for your spark job.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.