Elasticsearch-Pyspark Problem

Hello,

I'm trying to connect PySpark with Elasticsearch so that I can manipulate Elasticsearch docs using PySpark. However, I'm currently facing issues when trying to retrieve documents. I've tried different approaches, but I'm still getting errors.

I'm using:

  • Python 3.11.7
  • pyspark 3.3.3
  • Elasticsearch 8.11.3
  • elasticsearch-hadoop-8.11.3.jar

All of this is running on a Jupyter Notebook. As far as I've read, elasticsearch-hadoop is compatible with Spark 3.3.3 and Elasticsearch 8.11.3; however, I'm getting the following error over and over.

This is my code:

import findspark
findspark.init()

from pyspark.sql import SparkSession

# Initialize SparkSession
# Path to the Elasticsearch-Hadoop JAR file
es_hadoop_jar = "elasticsearch-hadoop-8.11.3/dist/elasticsearch-hadoop-8.11.3.jar"

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("ElasticsearchIntegration") \
    .config("spark.jars", es_hadoop_jar) \
    .config("spark.es.nodes", "localhost") \
    .config("spark.es.port", "9200") \
    .getOrCreate()

# Load data from Elasticsearch
df = spark.read.format("es").load("index")

# Show the DataFrame
df.show()

This is the error:

Py4JJavaError: An error occurred while calling o75.load.
: java.lang.NoClassDefFoundError: scala/Product$class
	at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:228)
	at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:97)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	... 20 more

I've read some posts indicating that this is a versioning error, but I cannot detect the problem.

Every insight will be highly appreciated.

Hi @tmslara.a. Sorry for the slow response, but I think you are probably right about the versioning error. Es-hadoop/spark packaging is a bit confusing. The elasticsearch-hadoop-8.11.3.jar contains a default version of es-spark that is probably not appropriate for your setup. You want to use [elasticsearch-spark-30_2.13-8.11.3.jar](https://central.sonatype.com/artifact/org.elasticsearch/elasticsearch-spark-30_2.13/8.11.3) if you are running scala 2.13, or [elasticsearch-spark-30_2.12-8.11.3.jar](https://central.sonatype.com/artifact/org.elasticsearch/elasticsearch-spark-30_2.12/8.11.3) if you are running scala 2.12.
See Issue Using the Connector from PySpark in 7.17.3 - #3 by Keith_Massey for an old post about this.