Thanks! @james.baiera and @hokiegeek2
My apologies for cross posting. Please find below the current code that we have. I will try your suggestion for rewriting the code to use the rest client instead.
Apologies in advance for a long list of exceptions. I am providing it as per @hokiegeek2's request for the current problem.
The connection manager to Elasticsearch is as follows:
object Connection {
def conn() = {
val port = 9300
val NodeName = "node1"
val ESClusterName = "escluster"
val nodes = List(NodeName)
val addresses = nodes.map { host => new TransportAddress(InetAddress.getByName(host), port) }
val DefaultTimeout = "30s"
val settings = (new MySettings).settings
val client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName(NodeName), port))
client
}
}
class MySettings extends Serializable {
val port = 9300
val NodeName = "node1"
val ESClusterName = "escluster"
val nodes = List(NodeName)
val addresses = nodes.map { host => new TransportAddress(InetAddress.getByName(host), port) }
val DefaultTimeout = "30s"
@transient lazy val settingBuilder = Settings.builder()
val settings = settingBuilder
.put("cluster.name", ESClusterName)
.put("client.transport.ping_timeout", DefaultTimeout) //The time to wait for a ping response from a node. Defaults to 5s
.build()
}
Please note: that the MySettings class was needed to create a serializable wrapper around the org.elasticsearch.common.settings.Settings.Builder object (value of Settings.builder()), which caused object-not-serializable exceptions when a PreBuiltTransportClient was otherwise built within the job_list_RDD.foreachPartition loop.
The scala code in Spark that uses this object Connection is as follows:
try {
val rdd_hostmetric_mapping = sc.parallelize(hostname_metric_submetric) //is a List[Tuple3[String, String, String]]
rdd_hostmetric_mapping.foreachPartition(iter => {
val rddclient = Connection.conn()
iter.foreach(hostmetric => {
hostmetric._2 match {
case "cpu" => {
generateCPUPlots(rddclient, guidid, hostmetric._1, hostmetric._3, labindex)
}
case "mem" => {
generateMemPlots(rddclient, guidid, hostmetric._1, labindex)
}
case "io" => {
generateIoPlots(rddclient, guidid, hostmetric._1, hostmetric._3, labindex)
}
case _ => {
DefaultLog.logger.debug("Unexpected metric")
}
}
})
rddclient.close()
})
} catch {
case (ex: Exception) => {
val sw = new StringWriter
ex.printStackTrace(new PrintWriter(sw))
DefaultLog.logger.error(sw.toString)
}
}
The stack trace with exceptions we get after 80% of the tasks are completed in the foreachPartition loop (Note here we are using Elasticsearch 5.5.1 and Spark 2.2.1) are:
[ERROR] org.apache.spark.network.server.TransportRequestHandler processRpcRequest - Error while invoking RpcHandler#receive() on RPC id 6276680107286128621
java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readUTF(DataInputStream.java:609)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at org.apache.spark.rpc.netty.RequestMessage$.readRpcAddress(NettyRpcEnv.scala:585)
at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:595)
[ERROR] org.apache.spark.util.SparkUncaughtExceptionHandler logError - Uncaught exception in thread Thread[elasticsearch[_client_][generic][T#3],5,main]
java.lang.NoSuchMethodError: io.netty.bootstrap.Bootstrap.config()Lio/netty/bootstrap/BootstrapConfig;
at org.elasticsearch.transport.netty4.Netty4Transport.lambda$stopInternal$5(Netty4Transport.java:443)
at org.apache.lucene.util.IOUtils.close(IOUtils.java:89)
at org.elasticsearch.common.lease.Releasables.close(Releasables.java:36)
at org.elasticsearch.common.lease.Releasables.close(Releasables.java:46)
at org.elasticsearch.common.lease.Releasables.close(Releasables.java:51)
at org.elasticsearch.transport.netty4.Netty4Transport.stopInternal(Netty4Transport.java:426)
at org.elasticsearch.transport.TcpTransport.lambda$doStop$5(TcpTransport.java:959)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:569)
[ERROR] org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 0.0 failed 4 times, most recent failure: Lost task 8.3 in stage 0.0 (TID 37, 192.168.200.105, executor 5): java.lang.IllegalStateException: availableProcessors is already set to [8], rejecting [8]
at io.netty.util.NettyRuntime$AvailableProcessorsHolder.setAvailableProcessors(NettyRuntime.java:51)
at io.netty.util.NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
at org.elasticsearch.transport.netty4.Netty4Utils.setAvailableProcessors(Netty4Utils.java:82)
at org.elasticsearch.transport.netty4.Netty4Transport.(Netty4Transport.java:138)
at org.elasticsearch.transport.Netty4Plugin.lambda$getTransports$0(Netty4Plugin.java:93)
at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:174)
at org.elasticsearch.client.transport.TransportClient.(TransportClient.java:265)
at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:130)
at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:116)
at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:106)
[ERROR] $ generatePlots - org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 21, 192.168.200.105, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)