Writing PySpark dataframe to Elastic Cloud (Cannot detect ES version)

Hi there!

My use case

  • Run PySpark job on EMR Serverless that reads data from S3 and writes it into Elastic cloud.

Errors

  • Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only' when attempting to write PySpark dataframe to Elastic cloud (v8.7.0).
  • ERROR NetworkClient: Node [Radar_DB:XXXX:9243] failed (java.net.UnknownHostException: Radar_DB); no other nodes left - aborting...
  • I see this error when running my job locally or on EMR Serverless.

Notes:

  • The code works fine when writing data into a locally running ES instance (v.8.7.0) from PySpark code.
  • It also works if I collect the dataframe and then use Python Elasticsearch client with bulk function.

Code snippet:

cloud_id="MY-CLOUD-ID"
api_key="MY-API-KEY"
es_index = "temp"

spark = SparkSession.builder \
    .appName("Write to Elasticsearch") \
    .getOrCreate()

data = [("1", "Apple"), ("2", "Amazon")]
df = spark.createDataFrame(data, ["company_id", "company_name"])


df.write.format("org.elasticsearch.spark.sql") \
    .option("es.nodes.wan.only", "true") \
    .option("es.nodes", cloud_id) \
    .option("es.port", "9243") \
    .option("es.nodes.version", "8") \
    .option("es.net.ssl", "true") \
    .option("es.net.http.auth.user", "elastic") \
    .option("es.net.http.auth.pass", api_key) \
    .option("es.resource", es_index) \
    .mode("overwrite") \
    .save()

Dependencies:

  • Spark 3.3.2
  • Scala 2.12
  • Hadoop 3
  • JAR: elasticsearch-spark-30_2.12-8.7.0.jar
  • Elastic cloud v.8.7.0

Help

I would appreciate any ideas on how to fix it :slight_smile:

When you're running locally, can you ping Radar_DB from your machine (or whichever machine the spark tasks are running on)? The message makes it sound like your machine can't find an IP address for Radar_DB.

Hi @Keith_Massey !

That's what I got:

PING nlb-proxy-prod-us-east-2-v2-0775b3e35e2dc05d.elb.us-east-2.amazonaws.com (18.224.99.228): 56 data bytes
Request timeout for icmp_seq 0
Request timeout for icmp_seq 1
Request timeout for icmp_seq 2
Request timeout for icmp_seq 3
Request timeout for icmp_seq 4
Request timeout for icmp_seq 5

At the same time, I am able to perform:

es = Elasticsearch(cloud_id="Radar_DB:XXX", api_key="XXX")
bulk(es, data)

Oh, sorry, I think I misunderstood! Es-hadoop/spark doesn't work with cloud_id. You have to give it a list of hostnames or IP addresses.

I see. Another rookie question: where can I find them in Elastic Cloud? :slight_smile:

That's actually a very good question with a complicated answer! I believe you can only access the ELB in cloud, and it round-robins to all of the Elasticsearch nodes. That's a little unfortunate because that means it appears as a single node to es-hadoop, and if es-hadoop (or spark) gets too many failures from a node, it will blacklist the node for the remainder of the job. Since there's only one "node", this means that es-hadoop will bail out of your job if it sees a small number of failures, even if in reality you have dozens of nodes behind the load balancer.
One way that people have worked around this is to create several aliases for the load balancer (in their own DNS or even locally in /etc/hosts), and to list the same load balancer multiple times in es.nodes, using the different aliases. I've written this up somewhere before, but I don't remember where. I'll link to it from here if I find it.

2 Likes

Thank you! I'd appreciate it.
Meanwhile, I will try it on my end and post the results here.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.