I have Elasticsearch 7.17.3 running on localhost:9200, so I downloaded Elasticsearch for Apache Hadoop 7.17.3. Then I copied Elasticsearch-spark-20_2.11-7.17.3.jar to my jar folder.
I read "Using the connector from PySpark" and was able to read data from Elasticsearch by adapting the first snippet with newAPIHadoopRDD as shown below.
conf = { "es.resource" : "myindex" }
es_rdd = sc.newAPIHadoopRDD(inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
es_rdd.first()
But when I follow the second snippet with org.elasticsearch.spark.sql
and run the code shown below, I get the following NoClassDefFoundError error:
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("myindex")
df.printSchema()
Py4JJavaError: An error occurred while calling o88.load.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:221)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:97)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
What am I doing wrong? For your information, I am using PySpark 3.2.1.