Hi, I have set up the latest version of elastic stack [8.15.2] on my local system.
I want to connect Pyspark with Elasticsearch then want to read the data and process those data into pipeline for further transformation and load, for reading data i have written a script
from pyspark.sql import SparkSession
jar_path = "file:///C:/Users/ashut/OneDrive/Desktop/nuodata/local/jars/elasticsearch-spark-30_2.12-8.15.2.jar"
spark = SparkSession.builder \
.appName("PySpark Elasticsearch Read (Local)") \
.config("spark.jars", jar_path) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
es_index = "myindex/doc"
df = spark.read \
.format("org.elasticsearch.spark.sql") \
.option("es.port", "9200") \
.option("es.nodes", "localhost") \
.option("es.nodes.wan.only", "true") \
.option("es.net.http.auth.user", "elastic")\
.option("es.net.http.auth.pass", "E-Pq*JknqQiWdU+XzD=9")\
.load(es_index)
df.show(truncate=False)
Spark version : 3.5.1
Scala version: 2.12.18
Java version ; Java 17.0.10
but when i execute the script getting error.
*ERROR NetworkClient: Node [localhost:9200] failed (org.elasticsearch.hadoop.thirdparty.apache.commons.httpclient.NoHttpResponseException: The server localhost failed to respond); no other nodes left - aborting...*
*Traceback (most recent call last):*
* File "c:\Users\ashut\OneDrive\Desktop\nuodata\Tasks\cassandra\elastic_script.py", line 22, in <module>*
* .load(es_index)*
* File "C:\Python310\lib\site-packages\pyspark\sql\readwriter.py", line 300, in load*
* return self._df(self._jreader.load(path))*
* File "C:\Python310\lib\site-packages\py4j\java_gateway.py", line 1322, in __call__*
* return_value = get_return_value(*
* File "C:\Python310\lib\site-packages\pyspark\errors\exceptions\captured.py", line 169, in deco*
* return f(*a, **kw)*
* File "C:\Python310\lib\site-packages\py4j\protocol.py", line 326, in get_return_value*
* raise Py4JJavaError(*
*py4j.protocol.Py4JJavaError: An error occurred while calling o34.load.*
*: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'*
* at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:403)*
* at org.elasticsearch.spark.sql.ElasticsearchRelation.cfg$lzycompute(DefaultSource.scala:234) *
* at org.elasticsearch.spark.sql.ElasticsearchRelation.cfg(DefaultSource.scala:231)*
* at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:238) *
* at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:238)*
* at org.elasticsearch.spark.sql.ElasticsearchRelation.$anonfun$schema$1(DefaultSource.scala:242) *
* at scala.Option.getOrElse(Option.scala:189)*
* at org.elasticsearch.spark.sql.ElasticsearchRelation.schema(DefaultSource.scala:242)*
* at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:434) *
* at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)*
* at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)*
* at scala.Option.getOrElse(Option.scala:189)*
* at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)*
* at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)*
* at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
* at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) *
* at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
* at java.base/java.lang.reflect.Method.invoke(Method.java:568)*
* at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)*
* at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)*
* at py4j.Gateway.invoke(Gateway.java:282)*
* at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)*
* at py4j.commands.CallCommand.execute(CallCommand.java:79)*
* at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)*
* at py4j.ClientServerConnection.run(ClientServerConnection.java:106)*
* at java.base/java.lang.Thread.run(Thread.java:842)*
*Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[localhost:9200]]*
* at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:160)*
* at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:442)*
* at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:438)*
* at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:406)*
* at org.elasticsearch.hadoop.rest.RestClient.mainInfo(RestClient.java:755)*
* at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:393)*
* ... 25 more*