Serializing Elasticsearch clients for use when looping through Spark RDDs

Thanks! @james.baiera and @hokiegeek2

My apologies for cross posting. Please find below the current code that we have. I will try your suggestion for rewriting the code to use the rest client instead.

Apologies in advance for a long list of exceptions. I am providing it as per @hokiegeek2's request for the current problem.

The connection manager to Elasticsearch is as follows:

object Connection {
  def conn() = {
    val port = 9300
    val NodeName = "node1"
    val ESClusterName = "escluster"
    val nodes = List(NodeName)
    val addresses = nodes.map { host => new TransportAddress(InetAddress.getByName(host), port) }
    val DefaultTimeout = "30s"

    val settings = (new MySettings).settings
    val client = new PreBuiltTransportClient(settings)
      .addTransportAddress(new TransportAddress(InetAddress.getByName(NodeName), port))
    client
  }
}

class MySettings extends Serializable {
  val port = 9300
  val NodeName = "node1"
  val ESClusterName = "escluster"
  val nodes = List(NodeName)
  val addresses = nodes.map { host => new TransportAddress(InetAddress.getByName(host), port) }
  val DefaultTimeout = "30s"
  @transient lazy val settingBuilder = Settings.builder()
  val settings = settingBuilder
    .put("cluster.name", ESClusterName)
    .put("client.transport.ping_timeout", DefaultTimeout) //The time to wait for a ping response from a node. Defaults to 5s
    .build()
}

Please note: that the MySettings class was needed to create a serializable wrapper around the org.elasticsearch.common.settings.Settings.Builder object (value of Settings.builder()), which caused object-not-serializable exceptions when a PreBuiltTransportClient was otherwise built within the job_list_RDD.foreachPartition loop.

The scala code in Spark that uses this object Connection is as follows:

try {
      val rdd_hostmetric_mapping = sc.parallelize(hostname_metric_submetric) //is a List[Tuple3[String, String, String]]
      rdd_hostmetric_mapping.foreachPartition(iter => {
        val rddclient = Connection.conn()
        iter.foreach(hostmetric => {
        hostmetric._2 match {
          case "cpu" => {
            generateCPUPlots(rddclient, guidid, hostmetric._1, hostmetric._3, labindex)
          }
          case "mem" => {
            generateMemPlots(rddclient, guidid, hostmetric._1, labindex)
          }
          case "io" => {
            generateIoPlots(rddclient, guidid, hostmetric._1, hostmetric._3, labindex)
          }
          case _ => {
            DefaultLog.logger.debug("Unexpected metric")
          }
        }
        })
        rddclient.close()
      })
    } catch {
      case (ex: Exception) => {
        val sw = new StringWriter
        ex.printStackTrace(new PrintWriter(sw))
        DefaultLog.logger.error(sw.toString)
      }
    }

The stack trace with exceptions we get after 80% of the tasks are completed in the foreachPartition loop (Note here we are using Elasticsearch 5.5.1 and Spark 2.2.1) are:

[ERROR] org.apache.spark.network.server.TransportRequestHandler processRpcRequest - Error while invoking RpcHandler#receive() on RPC id 6276680107286128621
java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:197)
        at java.io.DataInputStream.readUTF(DataInputStream.java:609)
        at java.io.DataInputStream.readUTF(DataInputStream.java:564)
        at org.apache.spark.rpc.netty.RequestMessage$.readRpcAddress(NettyRpcEnv.scala:585)
        at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:595)
[ERROR] org.apache.spark.util.SparkUncaughtExceptionHandler logError - Uncaught exception in thread Thread[elasticsearch[_client_][generic][T#3],5,main]
java.lang.NoSuchMethodError: io.netty.bootstrap.Bootstrap.config()Lio/netty/bootstrap/BootstrapConfig;
        at org.elasticsearch.transport.netty4.Netty4Transport.lambda$stopInternal$5(Netty4Transport.java:443)
        at org.apache.lucene.util.IOUtils.close(IOUtils.java:89)
        at org.elasticsearch.common.lease.Releasables.close(Releasables.java:36)
        at org.elasticsearch.common.lease.Releasables.close(Releasables.java:46)
        at org.elasticsearch.common.lease.Releasables.close(Releasables.java:51)
        at org.elasticsearch.transport.netty4.Netty4Transport.stopInternal(Netty4Transport.java:426)
        at org.elasticsearch.transport.TcpTransport.lambda$doStop$5(TcpTransport.java:959)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:569)
[ERROR] org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 0.0 failed 4 times, most recent failure: Lost task 8.3 in stage 0.0 (TID 37, 192.168.200.105, executor 5): java.lang.IllegalStateException: availableProcessors is already set to [8], rejecting [8]
        at io.netty.util.NettyRuntime$AvailableProcessorsHolder.setAvailableProcessors(NettyRuntime.java:51)
        at io.netty.util.NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
        at org.elasticsearch.transport.netty4.Netty4Utils.setAvailableProcessors(Netty4Utils.java:82)
        at org.elasticsearch.transport.netty4.Netty4Transport.(Netty4Transport.java:138)
        at org.elasticsearch.transport.Netty4Plugin.lambda$getTransports$0(Netty4Plugin.java:93)
        at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:174)
        at org.elasticsearch.client.transport.TransportClient.(TransportClient.java:265)
        at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:130)
        at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:116)
        at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:106)
[ERROR] $ generatePlots - org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 21, 192.168.200.105, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)