NoNodeAvailableException (None of the configured nodes are available) error when trying to push data to Elastic from a Spark job

Hi,

Any reason why we might be getting this error? The code seems to work fine in the non-distributed mode but the same code when run from a Spark job is not able to get to Elastic.

Spark version: 2.0.1 built for Hadoop 2.4, Scala 2.11
Elastic version: 2.3.1

I've verified the Elastic hosts and the cluster name.

The spot in the code where this happens is:

ClusterHealthResponse clusterHealthResponse = client.admin().cluster()
.prepareHealth()
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(10))
.get();

Stack trace:

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:902)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:900)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:900)
at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:218)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)
at com.myco.MyDriver$3.call(com.myco.MyDriver.java:214)
at com.myco.MyDriver$3.call(KafkaSparkStreamingDriver.java:201)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{XX.XXX.XXX.XX}{XX.XXX.XXX.XX:9300}]]
at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:290)
at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:207)
at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55)
at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:288)
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:359)
at org.elasticsearch.client.support.AbstractClient$ClusterAdmin.execute(AbstractClient.java:853)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:86)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:56)
at org.elasticsearch.action.ActionRequestBuilder.get(ActionRequestBuilder.java:64)
at com.myco.MyDriver.work()

Elastic client swallows exceptions in the TransportClientNodesService, specifically:

    protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {
        for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
            DiscoveryNode node = it.next();
            if (!transportService.nodeConnected(node)) {
                try {
                    logger.trace("connecting to node [{}]", node);
                    transportService.connectToNode(node);
                } catch (Throwable e) {
                    it.remove();
                    logger.debug("failed to connect to discovered node [" + node + "]", e);
                }
            }
        }

        return Collections.unmodifiableList(new ArrayList<>(nodes));
    }

Note that it also logs the exception at the DEBUG level. It took a considerable amount of effort to unravel this and get to the actual error which had to do with us not including the META-INF/services information for the Elastic services at build time of the Spark job uber-jar.

The actual exception masked by the exception swallowing code was:
ava.lang.IllegalArgumentException: An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist. You need to add the corresponding JAR file supporting this SPI to your classpath. The current classpath supports the following names: [es090, completion090, XBloomFilter]

The solution is to use the org.apache.maven.plugins.shade.resource.ServicesResourceTransformer when building the Spark job uber-jar with Maven. It's described in this StackOverflow post:

The swallowing of exceptions and reporting them at DEBUG log level, and returning a catch-all NoNodeAvailableException seems problematic to me and worthy of a bug report.

@dgoldenberg Is there any reason why you would be using the Elasticsearch client directly in the job code instead of leveraging the ES-Hadoop library? With ES-Hadoop you get automatic fail over of tasks to different shards during reads, as well as helpful settings for modifying which nodes in a cluster are allowed to receive writes.

Hi James,

Thanks for your response. All we need is a generic capability to send documents to Elastic. We don't currently need a native integration of Elastic to Hadoop, no reads, and no "modifying which nodes in a cluster are allowed to receive writes". All we need is a way to publish, generically, to Elastic as the destination where this destination is defined as a set of nodes + the cluster name + the index name to write to.

I'm not sure how ES-Hadoop would be relevant here.

My assumption was that we should be able to use the plain ES client API. Which actually works fine unless something is misconfigured, which is where exceptions happen in the TransportClientNodesService.

The swallowing of exceptions in the latter is the cause of the issue I'm reporting. I have filed an issue on the github about that.

If you are trying to write data to Elasticsearch from Spark, then ES-Hadoop could help you greatly by cutting down on the amount of code needed to do so.

It's as easy as including the jar on your job and using:

import org.elasticsearch.spark._
rdd.saveToEs("index/type", Map("es.nodes"->nodes))

Not to mention, unlike the Transport Client, ES-Hadoop is backwards compatible across multiple versions of Elasticsearch since it is based on an internal REST client. This makes your life easier when upgrading. :slight_smile:

Thanks, James.

At the time when we're ready to write data to Elastic, we no longer have the RDD in hand; it's way deeper into the code. The Transport Client works fine for us with the exception of what I've reported (the swallowing of exceptions issue).

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.