I have some issues saving an RDD after upgrading to ES 5.5.0
Libraries used are:
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.5.0"
The relevant part of the code is
val es_cfg = Map(
"es.mapping.id" -> "sub_id",
"es.index.read.missing.as.empty" -> "true",
"es.nodes" -> elasticSearchNode
)
keyedBySubid.mapWithState(stateSpec)
.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
rdd.map({ case Some(subscriptionInfo) => subscriptionInfo }).saveToEs(elasticSearchIndex, es_cfg)
println("rdd saved")
}
}
Exception is:
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [HEAD] on [subscription_info3/docs] failed; server[172.31.7.214:9200] returned [400|Bad Request:]
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:505)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:476)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:547)
at org.elasticsearch.hadoop.rest.InitializationUtils.checkIndexStatus(InitializationUtils.java:71)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettingsForReading(InitializationUtils.java:260)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:217)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:73)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:72)
at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:44)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:91)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:237)
at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:424)
at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:373)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:360)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:838)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
.......
Nothing interesting is shown in the elasticsearch.log
Any suggestion on what investigate further is really appreciated.
Simonluca