Hello,
I'm trying to connect PySpark with Elasticsearch so that I can manipulate Elasticsearch docs using PySpark. However, I'm currently facing issues when trying to retrieve documents. I've tried different approaches, but I'm still getting errors.
I'm using:
- Python 3.11.7
- pyspark 3.3.3
- Elasticsearch 8.11.3
- elasticsearch-hadoop-8.11.3.jar
All of this is running on a Jupyter Notebook. As far as I've read, elasticsearch-hadoop is compatible with Spark 3.3.3 and Elasticsearch 8.11.3; however, I'm getting the following error over and over.
This is my code:
import findspark
findspark.init()
from pyspark.sql import SparkSession
# Initialize SparkSession
# Path to the Elasticsearch-Hadoop JAR file
es_hadoop_jar = "elasticsearch-hadoop-8.11.3/dist/elasticsearch-hadoop-8.11.3.jar"
# Initialize SparkSession
spark = SparkSession.builder \
.appName("ElasticsearchIntegration") \
.config("spark.jars", es_hadoop_jar) \
.config("spark.es.nodes", "localhost") \
.config("spark.es.port", "9200") \
.getOrCreate()
# Load data from Elasticsearch
df = spark.read.format("es").load("index")
# Show the DataFrame
df.show()
This is the error:
Py4JJavaError: An error occurred while calling o75.load.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:228)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:97)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 20 more
I've read some posts indicating that this is a versioning error, but I cannot detect the problem.
Every insight will be highly appreciated.