Unable to connect Elasticsearch with Spark

Hi all,

I get this error when I try to connect Elasticsearch (running in a cluster) with Spark (running locally):

19/09/14 20:17:32 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out: connect
19/09/14 20:17:32 INFO HttpMethodDirector: Retrying request
19/09/14 20:17:53 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out: connect
19/09/14 20:17:53 INFO HttpMethodDirector: Retrying request
19/09/14 20:18:14 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out: connect
19/09/14 20:18:14 INFO HttpMethodDirector: Retrying request
19/09/14 20:18:35 ERROR NetworkClient: Node [xx.xx.xx.10:7474] failed (Connection timed out: connect); selected next node [xx.xx.xx.11:7474]
19/09/14 20:18:56 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out: connect
19/09/14 20:18:56 INFO HttpMethodDirector: Retrying request
19/09/14 20:21:23 ERROR NetworkClient: Node [xx.xx.xx.12:7474] failed (Connection timed out: connect); no other nodes left - aborting...
19/09/14 20:21:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[xx.xx.xx.10:7474, xx.xx.xx.11:7474, xx.xx.xx.12:7474]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:149)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:461)
    at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:469)
    at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:565)
    at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:560)
    at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:571)
    at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:418)
    at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:609)
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:597)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
    at org.elasticsearch.spark.rdd.EsSpark$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.elasticsearch.spark.rdd.EsSpark$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
19/09/14 20:21:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[xx.xx.xx.10:7474, xx.xx.xx.11:7474, xx.xx.xx.12:7474]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:149)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:461)
    at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:469)
    at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:565)
    at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:560)
    at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:571)
    at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:418)
    at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:609)
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:597)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
    at org.elasticsearch.spark.rdd.EsSpark$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.elasticsearch.spark.rdd.EsSpark$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

The code looks like:

SparkConf conf = new SparkConf()
		.setAppName("ES-Spark")
		.("local[*]")
		.set("es.nodes", "cluster-xxxx.xx.xx.xx")
		.set("es.port", "50467")
		.set("es.resource", "recommendations_2/recommendation")
		.set("es.resource.read", "products-20190912/product")
		.set("es.resource.write", "recommendations_2/recommendation")
		;
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaStreamingContext jsc = new JavaStreamingContext(ctx, new Duration(10000));
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = Utils.getKafkaStream(jsc);       
JavaDStream<String> allInfo = kafkaStream.map(f -> f.value());
JavaEsSparkStreaming.saveToEs(allInfo, "recommendations_2/recommendation");   
jsc.start();
jsc.awaitTermination();

It seems it reaches the cluster but it cannot connect back...

The elasticsearch-hadoop version used is:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-spark-20_2.10</artifactId>
  <version>5.6.2</version>
</dependency>

When I try to connect via the Transport Client everything is alright and I am able to make it.

I already checked the elasticsearch.yml file and the http.port property is set correctly to the right port.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.