Spark rdd save to elasticsearch with http signing

I would like to add a document to elasticsearch with the spark application.
However, because http signature authentication cannot be added in the spark application, the document cannot be added due to permission issues.
Ask for help if you can specify the RestHighLevelClient signing certificate in the step of setting up the spark context or saving the spark rdd to elasticsearch.

  • spark code
    def main(args: Array[String]): Unit = {
        val serviceName = "es"
        val aesEndpoint = "https://aesEndpoint.com"
        val aesPort = "443"
        val clientRegion = "clientRegion"
        val roleARN = "roleARN"
        val roleSessionName = "roleSessionName"

        val index = "es-test"
        val `type` = "_doc"
        val id = "8"
        
        val params = getParams(args)
        val sparkConf = new SparkConf().setAppName("preprocess_to_es_test").setMaster("local")
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        sparkConf.set("spark.streaming.kafka.consumer.cache.enabled", "false")
        sparkConf.set("spark.network.timeout", "800s")
        sparkConf.set("spark.streaming.backpressure.enabled", "true")
        sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "500")
        sparkConf.set("es.index.auto.create", "true")
        sparkConf.set("es.nodes.wan.only", "true")
        sparkConf.set("es.batch.size.entries", "2500")
        sparkConf.set("es.nodes.discovery", "false")

        sparkConf.set("es.nodes", aesEndpoint)
        sparkConf.set("es.port", aesPort)

        try {
            // Creating the STS client is part of your trusted code. It has
            // the security credentials you use to obtain temporary security credentials.
            // create session
            val stsClient = AWSSecurityTokenServiceClientBuilder
              .standard
              .withCredentials(new ProfileCredentialsProvider)
              .withRegion(clientRegion)
              .build()

            // Obtain credentials for the IAM role. Note that you cannot assume the role of an AWS root account;
            // Amazon S3 will deny access. You must use credentials for an IAM user or an IAM role.
            // create temp sessionCredentials
            val roleRequest: AssumeRoleRequest = new AssumeRoleRequest()
              .withRoleArn(roleARN)
              .withRoleSessionName(roleSessionName)
            val roleResponse = stsClient.assumeRole(roleRequest)
            val sessionCredentials = roleResponse.getCredentials

            // Create a BasicSessionCredentials object that contains the credentials you just retrieved.
            // get STS(AWS Security Token Service) key
            val awsCredentials: BasicSessionCredentials = new BasicSessionCredentials(
                sessionCredentials.getAccessKeyId,
                sessionCredentials.getSecretAccessKey,
                sessionCredentials.getSessionToken)
            val credentialsProvider: AWSCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials)
            val esClient: RestHighLevelClient = aesClient(serviceName, clientRegion, credentialsProvider)

            val sc = new SparkContext(sparkConf)
            //        val ssc = new StreamingContext(sparkConf, Seconds(120))
            val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3, "load_at" -> LocalDateTime.now().toString)
            val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran", "load_at" -> LocalDateTime.now().toString)

            sc.makeRDD(Seq(numbers, airports)).saveToEs("es-test/doc")
            esClient.close()
            println("The end")
        } catch {
            case e: AmazonServiceException =>
                // The call was transmitted successfully, but Amazon S3 couldn't process
                // it, so it returned an error response.
                e.printStackTrace()
            case e: SdkClientException =>
                // Amazon S3 couldn't be contacted for a response, or the client
                // couldn't parse the response from Amazon S3.
                e.printStackTrace()
            case e: IOException =>
                e.printStackTrace()
        }

        // Adds the interceptor to the ES REST client
        def aesClient(serviceName: String, region: String, credentialsProvider: AWSCredentialsProvider): RestHighLevelClient = {
            val signer = new AWS4Signer
            signer.setServiceName(serviceName)
            signer.setRegionName(region)
            val interceptor = new AWSRequestSigningApacheInterceptor(serviceName, signer, credentialsProvider)
            new RestHighLevelClient(RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(new HttpClientConfigCallback {
                override def customizeHttpClient(httpAsyncClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = HttpAsyncClientBuilder.create().addInterceptorLast(interceptor)
            }))
        }
    }
  • error message
Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
	at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:340)
	at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:104)
	at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:79)
	at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:74)
	at org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:55)
	at com.app.PreprocessToESNew$.main(PreprocessToESNew.scala:133)
	at com.app.PreprocessToESNew.main(PreprocessToESNew.scala)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [GET] on [] failed; server[https://aesEndpoint.com:443 ] returned [401|Unauthorized:]
	at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:388)
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:392)
	at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:168)
	at org.elasticsearch.hadoop.rest.RestClient.mainInfo(RestClient.java:745)
	at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:330)
	... 6 more

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.