ElasticSearch Newbie here, trying to perform unit testing using local spark and localhost for elasticsearch, but I'm receiving the following error during runtime:
[info] - Write to ElasticSearch *** FAILED ***
[info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
[info] at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:190)
[info] at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:379)
[info] at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
[info] at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
[info] at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:84)
[info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
[info] at org.apache.spark.scheduler.Task.run(Task.scala:89)
[info] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info] at java.lang.Thread.run(Thread.java:745)
[info] Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [GET] on [] failed; server[127.0.0.1:9200] returned [504|null:]
[info] at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:491)
[info] at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:449)
[info] at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:427)
[info] at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:431)
[info] at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:130)
[info] at org.elasticsearch.hadoop.rest.RestClient.esVersion(RestClient.java:579)
[info] at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:178)
[info] ... 10 more
[info]
[info] Driver stacktrace:
[info] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
[info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
[info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
[info] at scala.Option.foreach(Option.scala:257)
[info] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
[info] ...
Code is as follows
class BaselineTest extends FunSpec with ShouldMatchers with BeforeAndAfter {
private val conf = new SparkConf()
.setMaster("local")
.setAppName(s"Baseline Hourly Job")
.set("spark.ui.enabled", "false")
.set("es.index.auto.create", "true")
private var sc: SparkContext = _
before {
info("Before info block initiated")
sc = new SparkContext(conf)
}
after {
info("After info block initiated")
sc.stop()
}
describe("Baseline Test") {
it("Write to ElasticSearch"){
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")
}
}
}