Writing to ECK with Pyspark throws an SSLHandshakeException

Hi everyone,

I created an elasticsearch on kubernetes (ECK) deployment and I'm getting an error writing a 1 row dataframe to a new index in Pyspark.

For context, I'm running everything on Google Cloud Platform (GCP): Pyspark version 3.3 runs on Dataproc with the elastic-hadoop connector preconfigured (elasticsearch-spark-30_2.12:8.12.0). For ECK, I created a new Kubernetes Engine (GKE) cluster, installed the CRD using the helm chart (helm install elastic-operator elastic/eck-operator), and kubectl apply the following manifest:

kind: Elasticsearch
metadata:
  name: test
spec:
  version: 8.12.2
  http:
    service:
      spec:
        type: LoadBalancer
  nodeSets:
  - name: masters
    count: 3
    config:
      node.roles: ["master"]
  - name: data
    count: 8
    volumeClaimTemplates:
    - metadata:
        name: elasticsearch-data
      spec:
        ...
    config:
      node.roles: ["data", "ingest"]
    podTemplate:
      spec:
        initContainers:
        - name: sysctl
          securityContext:
            privileged: true
            runAsUser: 0
          command: ['sh', '-c', 'sysctl -w vm.max_map_count=262144']
        containers:
        - name: elasticsearch
          resources:
            ...

The LoadBalancer is created successfully and I can curl the cluster:

curl -k -v -u elastic:$PASSWORD https://$IP:9200/

The output shows that it's using a self-signed certificate

* SSL certificate verify result: self signed certificate in certificate chain (19), continuing anyway

I can also create the public certificate kubectl get secret "test-es-http-certs-public" -o go-template='{{index .data "tls.crt" | base64decode }}' > tls.crt and use it to curl which also works fine.

Now, when I try to run the following code from a jupyter cell

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("yarn").appName("Foo").getOrCreate()
data = [{'name': 'Alice', 'age': 1}]
df = spark.createDataFrame(data)

options = {
    "es.index.auto.create": "true",
    "es.net.http.auth.user": "elastic",
    "es.net.http.auth.pass": "<password>",
    "es.nodes": "https://<loadbalancer_ip>:9200",
    "es.nodes.wan.only": "true",
    "es.nodes.discovery": "false",
    "es.net.ssl.cert.allow.self.signed": "true",
    "es.net.ssl":"true",
    "es.resource": "foo/",
}

df.write.mode("overwrite").format("org.elasticsearch.spark.sql").options(**options).save()

I get the following error:

ERROR NetworkClient: Node [<loadbalancer_ip>:9200] failed (javax.net.ssl.SSLHandshakeException: java.security.cert.CertPathValidatorException: Trust anchor for certification path not found.); no other nodes left - aborting...

Note: I have not created a tls.crt file in my pyspark cluster.

Some things I've tried:

  • tls.selfSignedCertificate.disabled = true (in the manifest) - only renders my cluster unreachable (LibreSSL/3.3.6: error:1404B42E:SSL routines:ST_CONNECT:tlsv1 alert protocol version)
  • es.net.ssl.cert.allow.self.signed = false - nothing happens

I suspect I need to generate a certificate and addit in one of these configs, but it's not clear to me if that's needed and how to do so.

I would appreciate any help or pointers.

Thanks,
Aldo

Solved it. In case anyone stumbles on this thread and has the same problem, here are the steps I followed:

  1. In the init script of my cluster I pull the cert file kubectl get secret "test-es-http-certs-public" -o go-template='{{index .data "tls.crt" | base64decode }}' > /tmp/tls.crt
  2. Then, I add it to the keystore as sudo keytool -import -alias elastic -storepass <default_pass> -noprompt -keystore $JAVA_HOME/lib/security/cacerts -file /tmp/tls.crt
  3. Finally, in pyspark I did the following:

# In my case [/usr/lib/jvm/temurin-11-jdk-amd64/]
JAVA_HOME=os.getenv("JAVA_HOME") 

options = {
    "es.index.auto.create": "true",
    "es.net.http.auth.user": "elastic",
    "es.net.http.auth.pass": "<password>",
    "es.nodes": "https://<loadbalancer_ip>:9200",
    "es.nodes.wan.only": "true",
    "es.net.ssl.cert.allow.self.signed": "true",
    "es.net.ssl":"true",
    "es.net.ssl.cert.allow.self.signed", "true"
    "es.net.ssl.keystore.location", "file://{JAVA_HOME}/lib/security/cacerts"
    "es.net.ssl.keystore.pass", "<default_pass>"
    "es.resource": "foo/",
}

df.write.mode("overwrite").format("org.elasticsearch.spark.sql").options(**options).save()