Spark to AWS ElasticSearch Service

I am running spark on my local machine. I have Elastic Search up and running in AWS-ElasticSearch service. I am trying to follow the documentation specified here: https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

Version of Elasticsearch-spark that I am using is,

    <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-spark-20_2.10</artifactId>
                <version>7.5.0</version>
            </dependency>

This is how my SparkConf looks like:

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(properties.getProperty("app.name"))
                .set("es.nodes", "search-**********.us-west-1.es.amazonaws.com")
                .set("es.port","443")
                .set("es.http.timeout", "5m")
                .set("es.nodes.wan.only", "true");

# Call the method to send logs to ES, assume stringResults to a JavaDStream<Map<String, Object>> object
ElasticSearchManager.sendToEs(stringResults);

This is how I am trying to store the data in ElasticSearch

import static org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming.saveToEs;

public class ElasticSearchManager {

    public static void sendToEs(JavaDStream<Map<String, Object>> javaDStream) {
        ZonedDateTime dateTime = LocalDateTime.now().atZone(ZoneId.systemDefault());
        saveToEs(javaDStream,
                dateTime.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")));

    }
}

This is the error I get

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:340)
    at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:104)
    at org.elasticsearch.spark.streaming.EsSparkStreaming$anonfun$doSaveToEs$1.apply(EsSparkStreaming.scala:71)
    at org.elasticsearch.spark.streaming.EsSparkStreaming$anonfun$doSaveToEs$1.apply(EsSparkStreaming.scala:71)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [GET] on [] failed; server[search-************.us-west-1.es.amazonaws.com:443] returned [400|Bad Request:]
    at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:388)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:392)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:168)
    at org.elasticsearch.hadoop.rest.RestClient.mainInfo(RestClient.java:745)
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:330)
    ... 19 more

I tried to debug as to what's the issue. And, this is what I found in package org.elasticsearch.hadoop.rest.RestClient.java in line 745

Map<String, Object> result = get("", null);

Not sure why they would set the URI in the get method to empty string. Now I am struck at this point and don't have a good path forward. Any help would be appreciated.

That method is performing the "main" action on the cluster which should return the cluster's name, uuid, and version number. ES-Hadoop does this action first to determine which api's and features are available for it to use. Most likely this is an issue with how AWS Elasticsearch accepts traffic to the cluster.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.