Hello, I'm quite new to ES and ES-Hadoop. I'm using ES-Hadoop 2.3.3, Spark 1.6.1, Elasticsearch 2.3.3, and pyspark for interacting with Spark. The code for pulling the data out from ES is like below:
conf = {"es.resource.read" : "logstash*/log", "es.nodes" : "IP_ADDRESS", "es.port" : "9200", "es.query" : '{"fields":["message","@timestamp","mist_objectid","severity"],"query":{"filtered":{"filter":{"bool":{"must":[{"exists":{"field":"message"}},{"exists":{"field":"mist_objectid"}},{"exists":{"field":"@timestamp"}},{"exists":{"field":"severity"}}]}}}}}'}
esRDD = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
"org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
This code works well when I use it while retrieving data from local ES to local Spark. But when I try to retrieve data from live ES to my local Spark, it fails:
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "/opt/spark/python/pyspark/context.py", line 644, in newAPIHadoopRDD
jconf, batchSize)
File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/opt/spark/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:190)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:231)
at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:457)
at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:438)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
...
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: missing authentication token for REST request [/]
null
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:478)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:414)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:418)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:122)
at org.elasticsearch.hadoop.rest.RestClient.esVersion(RestClient.java:564)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:178)
... 32 more
By the way, I can connect to the host and server via telnet successfully. Can you please give me some advise?
Thanks in advance.