I'm trying to connect from spark to elasticsearch , so as first I've build docker images from spark3.2.1 for kubernetes then from jupyter notebook I've opened a session to spark a build the context to elasticsearch
on the juputer side I'm using correct one elasticsearch-spark jars
#set pyspark enviroment veriables\n",
os.environ["JAVA_HOME"]= "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ['PYSPARK_PYTHON'] = '/opt/conda/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/conda/bin/python'
os.environ['PYSPARK_SUBMIT_ARGS'] = "pyspark-shell"
spark = SparkSession.builder\
.master('k8s://https://10.250.131.225:6443')\
.appName('SparkTestarrow')\
.config('spark.submit.deployMode', 'client')\
.config('spark.driver.host', 'notebook-service')\
.config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
.config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
.config('spark.executor.instances', '1')\
.config('spark.kubernetes.namespace', 'spark')\
.config('spark.kubernetes.container.image', 'af2.corpo.l/elk_nt_ml/pyspark-k8s-conda:v3.2.1_1')\
.config('spark.kubernetes.container.image.pullPolicy', 'Always')\
.config('spark.jars', '/opt/spark/jars/elasticsearch-spark-30_2.12-8.1.0.jar')\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.getOrCreate()
from pyspark.conf import SparkConf
conf = SparkConf() \
.setAppName("My Spark Application") \
.setMaster("k8s://https://10.250.131.225:6443") \
.set("spark.kubernetes.container.image", "af2.corpo.l/elk_nt_ml/pyspark-k8s-conda:v3.2.1_1") \
.set("spark.kubernetes.authenticate.driver.serviceAccountName", "notebook-service") \
.set('spark.jars', '/opt/spark/jars/elasticsearch-spark-30_2.12-8.1.0.jar') \
.set("spark.executor.instances", "1")
#spark elastic reader setup
reader = spark.read.format("org.elasticsearch.spark.sql")\
.option("es.read.metadata", "false").option("es.nodes.wan.only","true")\
.option("es.net.ssl","false").option("es.nodes", "10.242.130.225").option("es.port", "9500")\
.option("es.net.http.auth.user", "elastic_writer")\
.option("es.net.http.auth.pass", "passw")\
.schema(custom_schema)
# .options(inferSchema='True')
df = reader.load('logstash-kpi--2023.02.21')\
.select("@timestamp","field1","field2")
df.show(5)
Py4JJavaError: An error occurred while calling o123.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.42.0.249 executor 1): java.io.InvalidClassException: org.elasticsearch.spark.rdd.AbstractEsRDD; local class incompatible: stream classdesc serialVersionUID = -7881716463493414592, local class serialVersionUID = 721883410674778504
at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
what's wrong??? I'm using the correct one jars on jupyter and spark side (elasticsearch-spark-30_2.12-8.1.0.jar) related to elasticsearch version 8.1.0
Do You have any idea?