Problem with retrieving data from ES into Spark

Hello, I'm quite new to ES and ES-Hadoop. I'm using ES-Hadoop 2.3.3, Spark 1.6.1, Elasticsearch 2.3.3, and pyspark for interacting with Spark. The code for pulling the data out from ES is like below:

conf = {"es.resource.read" : "logstash*/log", "es.nodes" : "IP_ADDRESS", "es.port" : "9200", "es.query" : '{"fields":["message","@timestamp","mist_objectid","severity"],"query":{"filtered":{"filter":{"bool":{"must":[{"exists":{"field":"message"}},{"exists":{"field":"mist_objectid"}},{"exists":{"field":"@timestamp"}},{"exists":{"field":"severity"}}]}}}}}'}
esRDD = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
    "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)

This code works well when I use it while retrieving data from local ES to local Spark. But when I try to retrieve data from live ES to my local Spark, it fails:

Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "/opt/spark/python/pyspark/context.py", line 644, in newAPIHadoopRDD
    jconf, batchSize)
  File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/opt/spark/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
        at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:190)
        at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:231)
        at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:457)
        at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:438)
        at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        ...
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: missing authentication token for REST request [/]
null
        at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:478)
        at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436)
        at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:414)
        at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:418)
        at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:122)
        at org.elasticsearch.hadoop.rest.RestClient.esVersion(RestClient.java:564)
        at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:178)
        ... 32 more

By the way, I can connect to the host and server via telnet successfully. Can you please give me some advise?
Thanks in advance.

Hello,

It seems that you're missing an authentication token for the REST request that discovers the version of Elasticsearch that you're using. Please double check your security settings on both the client and the Elasticsearch cluster.

Thank you for your message. You are right, I have set up my credentials in ES-Hadoop. But now, it gives me the following error:

>>> conf = {"es.net.http.auth.user": "USER_NAME", "es.net.http.auth.pass": "PASSWORD", "es.resource.read" : "logstash-2016.07.26/log", "es.nodes" : "IP_ADDRESS", "es.port" : "9200", "es.query" : '{"fields":["message","@timestamp","mist_objectid","severity"],"query":{"filtered":{"filter":{"bool":{"must":[{"exists":{"field":"message"}},{"exists":{"field":"mist_objectid"}},{"exists":{"field":"@timestamp"}},{"exists":{"field":"severity"}}]}}}}}'}
>>> cleaned_logs = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
...     "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "/opt/spark/python/pyspark/context.py", line 644, in newAPIHadoopRDD
    jconf, batchSize)
  File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/opt/spark/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [HEAD] on [logstash-2016.07.26/log] failed; server[IP_ADDRESS:9200] returned [403|Forbidden:]
        at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:478)
        at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:449)
        ...

Do I need an extra configuration in my spark code? Or should I do something else on ES side to access the data within a spark job? By the way, my user has read privilege on ES and I can retrieve data on command line via cURL command. But when I try it inside of the spark code, it gives me the 403 forbidden error.
Thanks

2 Likes