Jupyter spark connect to elasticsearch

I'm trying to connect from spark to elasticsearch , so as first I've build docker images from spark3.2.1 for kubernetes then from jupyter notebook I've opened a session to spark a build the context to elasticsearch

on the juputer side I'm using correct one elasticsearch-spark jars

#set pyspark enviroment veriables\n",
os.environ["JAVA_HOME"]= "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ['PYSPARK_PYTHON'] = '/opt/conda/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/conda/bin/python' 
os.environ['PYSPARK_SUBMIT_ARGS'] = "pyspark-shell"
spark = SparkSession.builder\
    .master('k8s://https://10.250.131.225:6443')\
    .appName('SparkTestarrow')\
    .config('spark.submit.deployMode', 'client')\
    .config('spark.driver.host', 'notebook-service')\
    .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .config('spark.executor.instances', '1')\
    .config('spark.kubernetes.namespace', 'spark')\
    .config('spark.kubernetes.container.image', 'af2.corpo.l/elk_nt_ml/pyspark-k8s-conda:v3.2.1_1')\
    .config('spark.kubernetes.container.image.pullPolicy', 'Always')\
    .config('spark.jars', '/opt/spark/jars/elasticsearch-spark-30_2.12-8.1.0.jar')\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .getOrCreate()
from pyspark.conf import SparkConf
conf = SparkConf() \
        .setAppName("My Spark Application") \
        .setMaster("k8s://https://10.250.131.225:6443") \
        .set("spark.kubernetes.container.image", "af2.corpo.l/elk_nt_ml/pyspark-k8s-conda:v3.2.1_1") \
        .set("spark.kubernetes.authenticate.driver.serviceAccountName", "notebook-service") \
        .set('spark.jars', '/opt/spark/jars/elasticsearch-spark-30_2.12-8.1.0.jar') \
        .set("spark.executor.instances", "1")
#spark elastic reader setup
reader = spark.read.format("org.elasticsearch.spark.sql")\
    .option("es.read.metadata", "false").option("es.nodes.wan.only","true")\
    .option("es.net.ssl","false").option("es.nodes", "10.242.130.225").option("es.port", "9500")\
    .option("es.net.http.auth.user", "elastic_writer")\
    .option("es.net.http.auth.pass", "passw")\
    .schema(custom_schema)
    # .options(inferSchema='True')
df = reader.load('logstash-kpi--2023.02.21')\
     .select("@timestamp","field1","field2")
df.show(5)
Py4JJavaError: An error occurred while calling o123.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.42.0.249 executor 1): java.io.InvalidClassException: org.elasticsearch.spark.rdd.AbstractEsRDD; local class incompatible: stream classdesc serialVersionUID = -7881716463493414592, local class serialVersionUID = 721883410674778504
	at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)

what's wrong??? I'm using the correct one jars on jupyter and spark side (elasticsearch-spark-30_2.12-8.1.0.jar) related to elasticsearch version 8.1.0

Do You have any idea?

Hmm this looks similar to BUG: Loggers in spark/core should be transient · Issue #958 · elastic/elasticsearch-hadoop · GitHub. The thinking there was that this was caused by two different versions of logging jars, and that was supposedly fixed by marking the logger field as transient in #958 - added transient to loggers where necessary by xjrk58 · Pull Request #959 · elastic/elasticsearch-hadoop · GitHub. Can you check whether you happen to have different versions of apache-commons-logging jars on the different machines? This looks like a bug in es-spark, but I think we'll need help identifying it.

if I'm checking in the right place It seems that on jupyter and spark I have the same one jars

root@sparktestarrow-b6467e8693512448-exec-1:/opt/spark/jars# ls -ltr | grep logging
-rw-r--r--. 1 root root    62050 Jan 20  2022 commons-logging-1.1.3.jar
-rw-r--r--. 1 root root    12483 Jan 20  2022 logging-interceptor-3.12.12.jar
(base) root@notebook-deployment-5dd5b59747-6kt9f:/usr/local/spark/jars# ls -ltr | grep logging
-rw-r--r--. 1 root root    62050 Jan 20  2022 commons-logging-1.1.3.jar
-rw-r--r--. 1 root root    12483 Jan 20  2022 logging-interceptor-3.12.12.jar

What should I verify .... more?

This (I believe) would be the version on one of the nodes running spark executors and the version on the driver (where your jupyter notebook is). And since we don't really know what is causing the problem, it's possible that the problem is some jar other than logging -- maybe the jvm itself? Or scala?

Let's see
whether this slight difference in java version would carry such an effect???



root@sparktestarrow-b6467e8693512448-exec-1:/opt/spark/bin# ./spark-shell
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/27 15:21:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://sparktestarrow-b6467e8693512448-exec-1:4040
Spark context available as 'sc' (master = local[*], app id = local-1677511270751).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.16)
Type in expressions to have them evaluated.
Type :help for more information.

scala>
(base) root@notebook-deployment-5dd5b59747-6kt9f:/usr/local/spark/bin# ./spark-shell
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/27 15:19:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/27 15:19:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://notebook-deployment-5dd5b59747-6kt9f:4041
Spark context available as 'sc' (master = local[*], app id = local-1677511158180).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.15)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

There are several ways to run spark and I'm not sure which one you're using. But the classpath for spark-shell on the executor node is not necessarily the same as the one that's actually used by the executors. I'm not familiar with running spark in kubernetes, so I don't know what the best advice is for getting the classpath of the executors or driver. If you're able to ssh onto the nodes you might be able to get it with something like ps -ef | grep spark.

00:00:07 /usr/local/openjdk-11/bin/java -Dio.netty.tryReflectionSetAccessible=true -Dspark.driver.port=35417 -Xms1024m -Xmx1024m -cp /opt/spark/conf::/opt/spark/jars/*: org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@notebook-service:35417 --executor-id 1 --cores 1 --app-id spark-application-1677537920957 --hostname 10.42.0.10 --resourceProfileId 0

I've modified also kind of jars
so spark doesn't not accept the other jars if You are only want to use elasticsearch-spark

aused by: java.lang.RuntimeException: Multiple ES-Hadoop versions detected in the classpath; please use only one
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-spark-30_2.12-8.1.0.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-hadoop-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-hadoop-hive-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-hadoop-mr-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-hadoop-pig-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-spark-20_2.11-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-storm-8.1.1.jar

	at org.elasticsearch.hadoop.util.Version.<clinit>(Version.java:98)
	... 38 more

but still facing the case with

times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.42.0.10 executor 1): java.io.InvalidClassException: org.elasticsearch.spark.rdd.AbstractEsRDD; local class incompatible: stream classdesc serialVersionUID = -7881716463493414592, local class serialVersionUID = 721883410674778504

even I've deleted not necessary files and pointed out

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-spark-30_2.12-8.1.0.jar pyspark-shell'

I'm not sure which unnecessary jars you've deleted, but the only elasticsearch-hadoop artifact that you want to have in your classpath (driver or executor) is elasticsearch-spark-30_2.12-8.1.0.jar.

on the jupyter notebook this session was executed in this way

usr/lib/jvm/java-11-openjdk-amd64/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/* -Xmx1g -Dio.netty.tryReflectionSetAccessible=true org.apache.spark.deploy.SparkSubmit --conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.sql.execution.arrow.pyspark.enabled=true --conf spark.master=k8s://https://10.250.131.225:6443 --conf spark.kubernetes.namespace=spark --conf spark.kubernetes.container.image=af2.corpo.l/elk_nt_ml/pyspark-k8s-conda:v3.2.1_1 --conf spark.app.name=SparkTestarrow --conf spark.submit.deployMode=client --conf spark.executor.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true --conf spark.executor.instances=1 --conf spark.driver.host=notebook-service --conf spark.jars=/opt/spark/jars/elasticsearch-spark-30_2.12-8.1.0.jar --conf spark.driver.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true --jars elasticsearch-spark-30_2.12-8.1.0.jar pyspark-shell

ok I've fixed this case by build jupyter images base on
jupyter/all-spark-notebook:spark-3.3.2
with the second image from apache spark 3.3.2 but pls notice that I'm using scala 2.12.

Glad to hear you were able to solve it! Any idea which jar was causing the problem? We'd love to be able to fix the code to handle this if possible.

I think it was related to mismatched jupyter version indeed scala version with scala on spark pod. If You are using elasticsearch-spark-30_2.12-8.1.0.jar You need to have only this jar from elasticsearch org
otherwise You get

caused by: java.lang.RuntimeException: Multiple ES-Hadoop versions detected in the classpath; please use only one
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-spark-30_2.12-8.1.0.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-hadoop-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-hadoop-hive-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-hadoop-mr-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-hadoop-pig-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-spark-20_2.11-8.1.1.jar
jar:file:/usr/local/spark-3.2.1-bin-hadoop3.2/jars/elasticsearch-storm-8.1.1.jar

	at org.elasticsearch.hadoop.util.Version.<clinit>(Version.java:98)
	... 38 more

and also is to worth to notice for set in jupyter
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-spark-30_2.12-8.1.0.jar pyspark-shell'