How do I connect PySpark to Elasticsearch with SSL and verify certs set to False? Ask Question


(Brett Anderson) #1

Previously I have successfully connected to an Elasticsearch cluster directly from Python with the following code:

ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

es = Elasticsearch(
    ES_HOST,
    http_auth=(ES_USERNAME, ES_PASSWORD),
    scheme="https",
    port=ES_PORT,
    use_ssl=True,
    verify_certs=False,
    ssl_context=ssl_context,
    ca_certs=False
)

Now I'm trying to connect to the same cluster using the Pyspark to Elasticsearch connector. Spark has been setup with version 2.4.0 and Hadoop 2.7. I'm using elasticsearch-hadoop-6.1.1 to connect the two.

I use the following configuration to connect PySpark with ES:

es_live_conf = {

    "es.nodes" : ES_HOST,

    "es.port" : ES_PORT,

    "es.resource" : 'testindex/testdoc',
    
    "es.net.http.auth.user" : ES_USERNAME,
    
    "es.net.http.auth.pass" : ES_PASSWORD,

    "es.net.ssl":"true",
    
    "es.nodes.resolve.hostname": "false",
    
    "es.net.ssl.cert.allow.self.signed": "true"
    
}

Then this code to activate the connection:

sc = SparkContext(appName="PythonSparkStreaming")  
sc.setLogLevel("WARN")

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_live_conf)

I then get an error as it tries each node. I've added the error as a comment since I exceeded the character limit when it was part of this post.

I should note that the same code using a configuration that connects with a local ES cluster works successfully. So the error is purely to do with the SSL connection to the remote cluster. I'm able to access the cluster from the client environment with the command curl --insecure --user ES_USER:ES_PASS -XGET 'https://ES_HOST:ES_PORT/' - so it seems that the client should be able to connect.

How do I setup the Spark ES connection with the correct ssl details? I've tried removing the es.net.ssl.cert.allow.self.signed and es.nodes.resolve.hostname properties in the configuration but still receive the same error.


(Brett Anderson) #2
19/02/07 07:00:31 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:31 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:31 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:31 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:31 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:31 TRACE CommonsHttpTransport:Tx [HEAD]@[xx.xx.xx.xx:xxxxx][testindex]?[null] w/ payload [null]
19/02/07 07:00:31 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:31 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:35 TRACE CommonsHttpTransport: Rx @[xx.xx.xx.xx] [404-Not Found] [null]
19/02/07 07:00:35 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:35 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:35 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:35 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:35 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:35 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:35 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][]?[null] w/ payload [null]
19/02/07 07:00:35 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:35 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:39 TRACE CommonsHttpTransport: Rx @[yyy.yy.y.y] [200-OK] [{
  "name" : "iad1esapp2vz742",
  "cluster_name" : "68fc89bc2c36e7188782e4f226ed3948",
  "cluster_uuid" : "7xk6py25R_mRgLgLTYeavg",
  "version" : {
    "number" : "5.6.5",
    "build_hash" : "6a37571",
    "build_date" : "2017-12-04T07:50:10.466Z",
    "build_snapshot" : false,
    "lucene_version" : "6.6.1"
  },
  "tagline" : "You Know, for Search"
}
]
19/02/07 07:00:39 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:39 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:39 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:39 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:39 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:39 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:39 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][_nodes/http]?[null] w/ payload [null]
19/02/07 07:00:39 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:39 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.

(Brett Anderson) #3

19/02/07 07:00:43 TRACE CommonsHttpTransport: Rx @[yyy.yy.y.y] [200-OK] [{"_nodes":{"total":14,"successful":14,"failed":0},"cluster_name":"68fc89bc2c36e7188782e4f226ed3948","nodes":{"VJl4scTeQbuPmnXcsb5MOA":{"name":"iad1esdn20vz166","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn20"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"MEq2hOGkSdeBMldoBiv2AQ":{"name":"iad1esdn23vz187","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn23"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"AHDBAMeQTg2LMyhvnxYxeQ":{"name":"iad1esmst4vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst4"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"nJjtbzRuTP2Bkp9E0EOLBw":{"name":"iad1esapp0vz755","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp0"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"5PrI09xWQRuNHPPUYCKOcQ":{"name":"iad1esdn31vz12","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn31"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"f1l6borzQt6-d_QEy9hY9Q":{"name":"iad1esapp1vz782","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp1"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"bO6AoGXFSgGzrISe-pCW8g":{"name":"iad1esdn21vz99","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn21"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"z7UA-JHlQ7SCNyUYfbwR1A":{"name":"iad1esmst5vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst5"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"53RAotd0QbiE28swz3fyOg":{"name":"iad1esapp2vz742","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp2"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"5Lje4zCgSryonTouZMBqyA":{"name":"iad1esdn22vz156","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn22"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"YHFxFv5SQzCt9OesKD_a-g":{"name":"iad1esapp3vz748","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp3"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"yZOAHjgoSz-Qi_eNO7HHxg":{"name":"iad1esmst3vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst3"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"kg9NYEJiRy60InOxIh796A":{"name":"iad1esdn1vz163","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn1"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"PvQ0PobMTaSNnzw0l03z4A":{"name":"iad1esdn13vz80","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn13"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}}}}]


(Brett Anderson) #4
19/02/07 07:00:43 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:43 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:43 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:43 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:43 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:43 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:43 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][_nodes/http]?[null] w/ payload [null]
19/02/07 07:00:43 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:43 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
[I 07:01:55.454 NotebookApp] Saving file at /work/sth-baseline.ipynb
[W 07:01:55.455 NotebookApp] Notebook work/sth-baseline.ipynb is not trusted
19/02/07 07:05:48 TRACE NetworkClient: Caught exception while performing request [xx.xx.xx.xx:xxxxx][_nodes/http] - falling back to the next node in line...
java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:666)
    at sun.security.ssl.SSLSocketImpl.<init>(SSLSocketImpl.java:471)
    at sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:153)
    at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSocket(SSLSocketFactory.java:129)
    at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
    at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
    at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
    at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
    at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
    at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:478)
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:112)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:466)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:430)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:155)
    at org.elasticsearch.hadoop.rest.RestClient.getHttpNodes(RestClient.java:112)
    at org.elasticsearch.hadoop.rest.RestClient.getHttpDataNodes(RestClient.java:129)
    at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:157)
    at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:223)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:405)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:386)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1343)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:239)
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:302)
    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
19/02/07 07:05:48 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:05:48 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:05:48 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:05:48 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:05:48 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:05:48 ERROR NetworkClient: Node [xx.xx.xx.xx:xxxxx] failed (Connection refused (Connection refused)); selected next node [xx.xx.xx.xx:xxxxx]

(Brett Anderson) #5

Ok, figured it out.

The answer is detailed here: https://www.elastic.co/guide/en/elasticsearch/hadoop/master/cloud.html

Briefly, as you can see in the logs posted the client is able to connect to the cluster and get back a list of nodes. Once it has that list it tries to connect to them, but they are all inaccessible on a private network. Essentially you need to turn the ability to connect directly with the cluster nodes off, though it's noted that this can affect performance. The setting to do this is "es.nodes.wan.only": "true", though I also set "es.nodes.discovery": "false" to figure this out, so the discovery setting probably isn't necessary but I'll leave it here in case it's useful.