Hi all,
I get this error when I try to connect Elasticsearch (running in a cluster) with Spark (running locally):
19/09/14 20:17:32 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out: connect
19/09/14 20:17:32 INFO HttpMethodDirector: Retrying request
19/09/14 20:17:53 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out: connect
19/09/14 20:17:53 INFO HttpMethodDirector: Retrying request
19/09/14 20:18:14 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out: connect
19/09/14 20:18:14 INFO HttpMethodDirector: Retrying request
19/09/14 20:18:35 ERROR NetworkClient: Node [xx.xx.xx.10:7474] failed (Connection timed out: connect); selected next node [xx.xx.xx.11:7474]
19/09/14 20:18:56 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection timed out: connect
19/09/14 20:18:56 INFO HttpMethodDirector: Retrying request
19/09/14 20:21:23 ERROR NetworkClient: Node [xx.xx.xx.12:7474] failed (Connection timed out: connect); no other nodes left - aborting...
19/09/14 20:21:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[xx.xx.xx.10:7474, xx.xx.xx.11:7474, xx.xx.xx.12:7474]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:149)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:461)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:469)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:565)
at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:560)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:571)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:418)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:609)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:597)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
at org.elasticsearch.spark.rdd.EsSpark$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.elasticsearch.spark.rdd.EsSpark$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
19/09/14 20:21:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[xx.xx.xx.10:7474, xx.xx.xx.11:7474, xx.xx.xx.12:7474]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:149)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:461)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:469)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:565)
at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:560)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:571)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:418)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:609)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:597)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
at org.elasticsearch.spark.rdd.EsSpark$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.elasticsearch.spark.rdd.EsSpark$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
The code looks like:
SparkConf conf = new SparkConf()
.setAppName("ES-Spark")
.("local[*]")
.set("es.nodes", "cluster-xxxx.xx.xx.xx")
.set("es.port", "50467")
.set("es.resource", "recommendations_2/recommendation")
.set("es.resource.read", "products-20190912/product")
.set("es.resource.write", "recommendations_2/recommendation")
;
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaStreamingContext jsc = new JavaStreamingContext(ctx, new Duration(10000));
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = Utils.getKafkaStream(jsc);
JavaDStream<String> allInfo = kafkaStream.map(f -> f.value());
JavaEsSparkStreaming.saveToEs(allInfo, "recommendations_2/recommendation");
jsc.start();
jsc.awaitTermination();
It seems it reaches the cluster but it cannot connect back...
The elasticsearch-hadoop version used is:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.10</artifactId>
<version>5.6.2</version>
</dependency>
When I try to connect via the Transport Client everything is alright and I am able to make it.
I already checked the elasticsearch.yml file and the http.port property is set correctly to the right port.