Trying to Unit Test with ElasticSearch Spark, Getting Error

ElasticSearch Newbie here, trying to perform unit testing using local spark and localhost for elasticsearch, but I'm receiving the following error during runtime:

[info] - Write to ElasticSearch *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
[info] 	at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:190)
[info] 	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:379)
[info] 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
[info] 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
[info] 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
[info] 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
[info] 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
[info] 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
[info] 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info] 	at java.lang.Thread.run(Thread.java:745)
[info] Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [GET] on [] failed; server[127.0.0.1:9200] returned [504|null:]
[info] 	at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:491)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:449)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:427)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:431)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:130)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.esVersion(RestClient.java:579)
[info] 	at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:178)
[info] 	... 10 more
[info] 
[info] Driver stacktrace:
[info]   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
[info]   at scala.Option.foreach(Option.scala:257)
[info]   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
[info]   ...

Code is as follows

class BaselineTest extends FunSpec with ShouldMatchers with BeforeAndAfter {

  private val conf = new SparkConf()
      .setMaster("local")
      .setAppName(s"Baseline Hourly Job")
      .set("spark.ui.enabled", "false")
      .set("es.index.auto.create", "true")

		
	private var sc: SparkContext = _

 before {
		info("Before info block initiated")
		sc = new SparkContext(conf)
  }
  
  after {
    info("After info block initiated")
    sc.stop()
  }
  
  describe("Baseline Test") {
    
    it("Write to ElasticSearch"){
      val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
      val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
      sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")
    }
  }
}

Hi @sarunkum,

I don't know anything about Spark but if you look at the trace you can see this line:

Further inspecting the stack trace, it looks to me that the client tried to issue a HTTP GET request to http://127.0.0.1:9200/ but got HTTP status 504 (which is really odd) instead of HTTP 200. So I'd try to do:

curl http://127.0.0.1:9200/

and check the response. Is Elasticsearch running on the same machine where you execute this port 9200?

On my machine, the command above shows:

{
  "name" : "0Xaj57L",
  "cluster_name" : "distribution_run",
  "version" : {
    "number" : "5.0.0-alpha6",
    "build_hash" : "2a00c9d",
    "build_date" : "2016-08-30T08:26:36.232Z",
    "build_snapshot" : true,
    "lucene_version" : "6.2.0"
  },
  "tagline" : "You Know, for Search"
}

Daniel

Hi Daniel,

My understanding of this is that I can create / bring up the local node during runtime and then perform CRUD operations against it, similar to hbase - is there an expectation that the local node is already up and running through some other means? Previously, I was able to accomplish this in junit using the Node, Client, and NodeBuilder libraries, I thought the concept was the same for this elasticsearch-spark setup, am I misunderstanding something...?

HTTP 504 is a very strange response from that request indeed. @sarunkum, are you running Elasticsearch embedded in your test, or is it running as a separate process? If you're running it embedded, try executing it as a separate process and seeing if the problem persists?

It was embedded. ultimately need to be able to execute automated tests directly from jenkins builds without external environment dependencies.

@sarunkum if you run Elasticsearch as a separate process does the problem still persist? I ask because there could be valuable information in the logs that might explain why the server is responding with a 504.

Hi James,

I installed ElasticSearch on my machine and brought it up manually, then ran curl from a separate window, everything works fine that way. I've tried just leaving that running and executing my unit test code, didn't work, same failure. any ideas?

Sarosh

Actually, strangely, after letting it run for a while, i'm getting gateway timeouts :frowning:

@sarunkum, are there any anomalies present in the Elasticsearch server logs surrounding the 504 errors? Are these happening at the same place as before (discovering the ES version)?