Trying to Unit Test with ElasticSearch Spark, Getting Error


(Sarosh Arunkumar) #1

ElasticSearch Newbie here, trying to perform unit testing using local spark and localhost for elasticsearch, but I'm receiving the following error during runtime:

[info] - Write to ElasticSearch *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
[info] 	at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:190)
[info] 	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:379)
[info] 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
[info] 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
[info] 	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
[info] 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
[info] 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
[info] 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
[info] 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info] 	at java.lang.Thread.run(Thread.java:745)
[info] Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [GET] on [] failed; server[127.0.0.1:9200] returned [504|null:]
[info] 	at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:491)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:449)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:427)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:431)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:130)
[info] 	at org.elasticsearch.hadoop.rest.RestClient.esVersion(RestClient.java:579)
[info] 	at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:178)
[info] 	... 10 more
[info] 
[info] Driver stacktrace:
[info]   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
[info]   at scala.Option.foreach(Option.scala:257)
[info]   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
[info]   ...

Code is as follows

class BaselineTest extends FunSpec with ShouldMatchers with BeforeAndAfter {

  private val conf = new SparkConf()
      .setMaster("local")
      .setAppName(s"Baseline Hourly Job")
      .set("spark.ui.enabled", "false")
      .set("es.index.auto.create", "true")

		
	private var sc: SparkContext = _

 before {
		info("Before info block initiated")
		sc = new SparkContext(conf)
  }
  
  after {
    info("After info block initiated")
    sc.stop()
  }
  
  describe("Baseline Test") {
    
    it("Write to ElasticSearch"){
      val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
      val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
      sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")
    }
  }
}

(Daniel Mitterdorfer) #2

Hi @sarunkum,

I don't know anything about Spark but if you look at the trace you can see this line:

Further inspecting the stack trace, it looks to me that the client tried to issue a HTTP GET request to http://127.0.0.1:9200/ but got HTTP status 504 (which is really odd) instead of HTTP 200. So I'd try to do:

curl http://127.0.0.1:9200/

and check the response. Is Elasticsearch running on the same machine where you execute this port 9200?

On my machine, the command above shows:

{
  "name" : "0Xaj57L",
  "cluster_name" : "distribution_run",
  "version" : {
    "number" : "5.0.0-alpha6",
    "build_hash" : "2a00c9d",
    "build_date" : "2016-08-30T08:26:36.232Z",
    "build_snapshot" : true,
    "lucene_version" : "6.2.0"
  },
  "tagline" : "You Know, for Search"
}

Daniel


(Sarosh Arunkumar) #3

Hi Daniel,

My understanding of this is that I can create / bring up the local node during runtime and then perform CRUD operations against it, similar to hbase - is there an expectation that the local node is already up and running through some other means? Previously, I was able to accomplish this in junit using the Node, Client, and NodeBuilder libraries, I thought the concept was the same for this elasticsearch-spark setup, am I misunderstanding something...?


(James Baiera) #4

HTTP 504 is a very strange response from that request indeed. @sarunkum, are you running Elasticsearch embedded in your test, or is it running as a separate process? If you're running it embedded, try executing it as a separate process and seeing if the problem persists?


(Sarosh Arunkumar) #5

It was embedded. ultimately need to be able to execute automated tests directly from jenkins builds without external environment dependencies.


(James Baiera) #6

@sarunkum if you run Elasticsearch as a separate process does the problem still persist? I ask because there could be valuable information in the logs that might explain why the server is responding with a 504.


(Sarosh Arunkumar) #7

Hi James,

I installed ElasticSearch on my machine and brought it up manually, then ran curl from a separate window, everything works fine that way. I've tried just leaving that running and executing my unit test code, didn't work, same failure. any ideas?

Sarosh


(Sarosh Arunkumar) #8

Actually, strangely, after letting it run for a while, i'm getting gateway timeouts :frowning:


(James Baiera) #9

@sarunkum, are there any anomalies present in the Elasticsearch server logs surrounding the 504 errors? Are these happening at the same place as before (discovering the ES version)?


(system) #10