Thanks! @james.baiera and @hokiegeek2
My apologies for cross posting. Please find below the current code that we have. I will try your suggestion for rewriting the code to use the rest client instead.
Apologies in advance for a long list of exceptions. I am providing it as per @hokiegeek2's request for the current problem.
The connection manager to Elasticsearch is as follows:
object Connection { def conn() = { val port = 9300 val NodeName = "node1" val ESClusterName = "escluster" val nodes = List(NodeName) val addresses = nodes.map { host => new TransportAddress(InetAddress.getByName(host), port) } val DefaultTimeout = "30s" val settings = (new MySettings).settings val client = new PreBuiltTransportClient(settings) .addTransportAddress(new TransportAddress(InetAddress.getByName(NodeName), port)) client } } class MySettings extends Serializable { val port = 9300 val NodeName = "node1" val ESClusterName = "escluster" val nodes = List(NodeName) val addresses = nodes.map { host => new TransportAddress(InetAddress.getByName(host), port) } val DefaultTimeout = "30s" @transient lazy val settingBuilder = Settings.builder() val settings = settingBuilder .put("cluster.name", ESClusterName) .put("client.transport.ping_timeout", DefaultTimeout) //The time to wait for a ping response from a node. Defaults to 5s .build() }
Please note: that the MySettings class was needed to create a serializable wrapper around the org.elasticsearch.common.settings.Settings.Builder object (value of Settings.builder()), which caused object-not-serializable exceptions when a PreBuiltTransportClient was otherwise built within the job_list_RDD.foreachPartition loop.
The scala code in Spark that uses this object Connection is as follows:
try { val rdd_hostmetric_mapping = sc.parallelize(hostname_metric_submetric) //is a List[Tuple3[String, String, String]] rdd_hostmetric_mapping.foreachPartition(iter => { val rddclient = Connection.conn() iter.foreach(hostmetric => { hostmetric._2 match { case "cpu" => { generateCPUPlots(rddclient, guidid, hostmetric._1, hostmetric._3, labindex) } case "mem" => { generateMemPlots(rddclient, guidid, hostmetric._1, labindex) } case "io" => { generateIoPlots(rddclient, guidid, hostmetric._1, hostmetric._3, labindex) } case _ => { DefaultLog.logger.debug("Unexpected metric") } } }) rddclient.close() }) } catch { case (ex: Exception) => { val sw = new StringWriter ex.printStackTrace(new PrintWriter(sw)) DefaultLog.logger.error(sw.toString) } }
The stack trace with exceptions we get after 80% of the tasks are completed in the foreachPartition loop (Note here we are using Elasticsearch 5.5.1 and Spark 2.2.1) are:
[ERROR] org.apache.spark.network.server.TransportRequestHandler processRpcRequest - Error while invoking RpcHandler#receive() on RPC id 6276680107286128621 java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readUTF(DataInputStream.java:609) at java.io.DataInputStream.readUTF(DataInputStream.java:564) at org.apache.spark.rpc.netty.RequestMessage$.readRpcAddress(NettyRpcEnv.scala:585) at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:595)
[ERROR] org.apache.spark.util.SparkUncaughtExceptionHandler logError - Uncaught exception in thread Thread[elasticsearch[_client_][generic][T#3],5,main] java.lang.NoSuchMethodError: io.netty.bootstrap.Bootstrap.config()Lio/netty/bootstrap/BootstrapConfig; at org.elasticsearch.transport.netty4.Netty4Transport.lambda$stopInternal$5(Netty4Transport.java:443) at org.apache.lucene.util.IOUtils.close(IOUtils.java:89) at org.elasticsearch.common.lease.Releasables.close(Releasables.java:36) at org.elasticsearch.common.lease.Releasables.close(Releasables.java:46) at org.elasticsearch.common.lease.Releasables.close(Releasables.java:51) at org.elasticsearch.transport.netty4.Netty4Transport.stopInternal(Netty4Transport.java:426) at org.elasticsearch.transport.TcpTransport.lambda$doStop$5(TcpTransport.java:959) at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:569)
[ERROR] org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 0.0 failed 4 times, most recent failure: Lost task 8.3 in stage 0.0 (TID 37, 192.168.200.105, executor 5): java.lang.IllegalStateException: availableProcessors is already set to [8], rejecting [8] at io.netty.util.NettyRuntime$AvailableProcessorsHolder.setAvailableProcessors(NettyRuntime.java:51) at io.netty.util.NettyRuntime.setAvailableProcessors(NettyRuntime.java:87) at org.elasticsearch.transport.netty4.Netty4Utils.setAvailableProcessors(Netty4Utils.java:82) at org.elasticsearch.transport.netty4.Netty4Transport.(Netty4Transport.java:138) at org.elasticsearch.transport.Netty4Plugin.lambda$getTransports$0(Netty4Plugin.java:93) at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:174) at org.elasticsearch.client.transport.TransportClient.(TransportClient.java:265) at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:130) at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:116) at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:106)
[ERROR] $ generatePlots - org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 21, 192.168.200.105, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)