We are trying to index our Elasticsearch cluster (version: 7.6.1) from Databricks using the elasticsearch-hadoop library (version: 7.9.3).
Due to corporate compliance reasons, our Elasticsearch cluster can only be accessed through Azure API Management services.
Therefore the URL through which we can access our Elasticsearch cluster looks like this:
https://api.example.com/datalake/sandbox/elasticsearch-prp?version=1.0
We have successfully managed to contact our Elasticsearch cluster from our Databricks notebook using Curl (please, note the 2 authentication-relevant headers that we have to use in addition to the obligatory version URL request parameter):
%sh
curl 'https://api.example.com/datalake/sandbox/elasticsearch-prp?version=1.0' \
--header 'Authorization: Bearer AUTH_TOKEN' \
--header 'Ocp-Apim-Subscription-Key: SUBSCRIPTION_KEY'
{
"name" : "dragonfly-elastic-search-cluster-es-default-2",
"cluster_name" : "dragonfly-elastic-search-cluster",
**.... truncated for brevity ...**
"tagline" : "You Know, for Search"
}
Now we are trying to index some data on Elasticsearch from the same notebook using the library (please, note how we supplied the path prefix and also the headers):
esURL = "api.example.com"
df.write \
.format("org.elasticsearch.spark.sql") \
.option("es.nodes.wan.only","true") \
.option("es.port","443") \
.option("es.net.ssl","true") \
.option("es.nodes", esURL) \
.option("es.nodes.path.prefix", "/datalake/sandbox/elasticsearch-prp?version=1.0") \
.option("es.net.http.header.Authorization", AUTH_TOKEN) \
.option("es.net.http.header.Ocp-Apim-Subscription-Key", SUBSCRIPTION_KEY) \
.mode("Overwrite") \
.save("index/test")
This fails with the famous:
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:348)
at org.elasticsearch.spark.sql.ElasticsearchRelation.cfg$lzycompute(DefaultSource.scala:225)
at org.elasticsearch.spark.sql.ElasticsearchRelation.cfg(DefaultSource.scala:223)
at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:603)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:108)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:147)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:188)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:135)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:117)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:113)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:243)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:173)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:711)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:307)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[api.example.com:443]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:160)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:432)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:388)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:392)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:168)
at org.elasticsearch.hadoop.rest.RestClient.mainInfo(RestClient.java:745)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:338)
... 37 more
Upon inspecting the elasticsearch-hadoop logs, I noticed this ERROR log:
20/11/11 13:29:24 ERROR NetworkClient: Node [api.example.com:443] failed (Path has query portion on it: [/datalake/sandbox/elasticsearch-prp?version=1.0/]); no other nodes left - aborting...
I then proceeded by checking the source code and indeed the library does not allow URL parameters to be defined in es.nodes.path.prefix. It checks for "?" in that property and then throws this exception.
I tried supplying the ?version=1.0 portion of the URL in the es.query property as a workaround instead but this failed with:
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:348)
at org.elasticsearch.spark.sql.ElasticsearchRelation.cfg$lzycompute(DefaultSource.scala:225)
at org.elasticsearch.spark.sql.ElasticsearchRelation.cfg(DefaultSource.scala:223)
at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:603)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:108)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:147)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:188)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:135)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:117)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:113)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:243)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:173)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:711)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:307)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: null
null
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:388)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:392)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:168)
at org.elasticsearch.hadoop.rest.RestClient.mainInfo(RestClient.java:745)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:338)
... 37 more
Please, note that it's the same exception but this time with a different root cause as follows:
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: null
null
This might be the caused by the fact that our endpoint is not reachable without supplying the "version" URL parameter correctly (in this case, we receive a 404).
Our 2 concrete questions are:
- Is there any way we can correctly provide URL parameters for all calls to our Elasticsearch cluster?
- If there isn't, what other ways would you recommend us so that we can still index our Elasticsearch cluster using Databricks/Spark?
We'd really appreciate any kind of help on the topic. Thanks in advance.