I am running spark on my local machine. I have Elastic Search up and running in AWS-ElasticSearch service. I am trying to follow the documentation specified here: https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html
Version of Elasticsearch-spark that I am using is,
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.10</artifactId>
<version>7.5.0</version>
</dependency>
This is how my SparkConf looks like:
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(properties.getProperty("app.name"))
.set("es.nodes", "search-**********.us-west-1.es.amazonaws.com")
.set("es.port","443")
.set("es.http.timeout", "5m")
.set("es.nodes.wan.only", "true");
# Call the method to send logs to ES, assume stringResults to a JavaDStream<Map<String, Object>> object
ElasticSearchManager.sendToEs(stringResults);
This is how I am trying to store the data in ElasticSearch
import static org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming.saveToEs;
public class ElasticSearchManager {
public static void sendToEs(JavaDStream<Map<String, Object>> javaDStream) {
ZonedDateTime dateTime = LocalDateTime.now().atZone(ZoneId.systemDefault());
saveToEs(javaDStream,
dateTime.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")));
}
}
This is the error I get
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:340)
at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:104)
at org.elasticsearch.spark.streaming.EsSparkStreaming$anonfun$doSaveToEs$1.apply(EsSparkStreaming.scala:71)
at org.elasticsearch.spark.streaming.EsSparkStreaming$anonfun$doSaveToEs$1.apply(EsSparkStreaming.scala:71)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [GET] on [] failed; server[search-************.us-west-1.es.amazonaws.com:443] returned [400|Bad Request:]
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:388)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:392)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:168)
at org.elasticsearch.hadoop.rest.RestClient.mainInfo(RestClient.java:745)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:330)
... 19 more
I tried to debug as to what's the issue. And, this is what I found in package org.elasticsearch.hadoop.rest.RestClient.java in line 745
Map<String, Object> result = get("", null);
Not sure why they would set the URI in the get method to empty string. Now I am struck at this point and don't have a good path forward. Any help would be appreciated.