Serializing Elasticsearch clients for use when looping through Spark RDDs

Opening and closing connections for Elasticsearch TransportClients
We have the need to distribute tasks across several machines hence the use of rdd.partitions to distribute a dataset on each node. Each node that is operating on a part of the dataset with foreachPartition does the following:

  • we open a connection to Elasticsearch,
  • query Elasticsearch to get specific data provided by the rdd dataset
  • use the data to generate some graphs
  • close the connection

After going through processing some portion of the dataset the apache-spark job fails with exceptions as listed on the above stackoverflow.com message.

Please suggest what can we do to fix this? We are not sure if this is a limitation with Elasticsearch Client or if there is something incorrect in the way Apache-Spark job works with partitions.

Please note that we are not using Elasticsearch-spark but are using the Java Apis provided by org.elasticsearch/elasticsearch/jars and org.elasticsearch.client/transport/jars.

Thanks in advance.

Hi,

This sounds like you are writing out to an RDD a non-serializable object. Please post your code along with the stack trace.

Thanks

--John

@pavocristatus Please avoid cross posting from StackOverflow. This makes responding to issues difficult when information is split across two sites. Additionally, readers that may be experiencing your problem may have a hard time finding this post if it is not the same content. Going forward, I request that you post the issue/question in full on here.

I suggest to avoid using the Transport client with Spark. The Transport client depends on Netty, which Spark also depends on. We ran into issues at the start of the 5.0 line with our testing in ES-Hadoop because the Netty versions in Spark and Elasticsearch did not agree with each other. I would start by suggesting to use the Rest Client that is now provided in Java to see if that helps any.

Thanks! @james.baiera and @hokiegeek2

My apologies for cross posting. Please find below the current code that we have. I will try your suggestion for rewriting the code to use the rest client instead.

Apologies in advance for a long list of exceptions. I am providing it as per @hokiegeek2's request for the current problem.

The connection manager to Elasticsearch is as follows:

object Connection {
  def conn() = {
    val port = 9300
    val NodeName = "node1"
    val ESClusterName = "escluster"
    val nodes = List(NodeName)
    val addresses = nodes.map { host => new TransportAddress(InetAddress.getByName(host), port) }
    val DefaultTimeout = "30s"

    val settings = (new MySettings).settings
    val client = new PreBuiltTransportClient(settings)
      .addTransportAddress(new TransportAddress(InetAddress.getByName(NodeName), port))
    client
  }
}

class MySettings extends Serializable {
  val port = 9300
  val NodeName = "node1"
  val ESClusterName = "escluster"
  val nodes = List(NodeName)
  val addresses = nodes.map { host => new TransportAddress(InetAddress.getByName(host), port) }
  val DefaultTimeout = "30s"
  @transient lazy val settingBuilder = Settings.builder()
  val settings = settingBuilder
    .put("cluster.name", ESClusterName)
    .put("client.transport.ping_timeout", DefaultTimeout) //The time to wait for a ping response from a node. Defaults to 5s
    .build()
}

Please note: that the MySettings class was needed to create a serializable wrapper around the org.elasticsearch.common.settings.Settings.Builder object (value of Settings.builder()), which caused object-not-serializable exceptions when a PreBuiltTransportClient was otherwise built within the job_list_RDD.foreachPartition loop.

The scala code in Spark that uses this object Connection is as follows:

try {
      val rdd_hostmetric_mapping = sc.parallelize(hostname_metric_submetric) //is a List[Tuple3[String, String, String]]
      rdd_hostmetric_mapping.foreachPartition(iter => {
        val rddclient = Connection.conn()
        iter.foreach(hostmetric => {
        hostmetric._2 match {
          case "cpu" => {
            generateCPUPlots(rddclient, guidid, hostmetric._1, hostmetric._3, labindex)
          }
          case "mem" => {
            generateMemPlots(rddclient, guidid, hostmetric._1, labindex)
          }
          case "io" => {
            generateIoPlots(rddclient, guidid, hostmetric._1, hostmetric._3, labindex)
          }
          case _ => {
            DefaultLog.logger.debug("Unexpected metric")
          }
        }
        })
        rddclient.close()
      })
    } catch {
      case (ex: Exception) => {
        val sw = new StringWriter
        ex.printStackTrace(new PrintWriter(sw))
        DefaultLog.logger.error(sw.toString)
      }
    }

The stack trace with exceptions we get after 80% of the tasks are completed in the foreachPartition loop (Note here we are using Elasticsearch 5.5.1 and Spark 2.2.1) are:

[ERROR] org.apache.spark.network.server.TransportRequestHandler processRpcRequest - Error while invoking RpcHandler#receive() on RPC id 6276680107286128621
java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:197)
        at java.io.DataInputStream.readUTF(DataInputStream.java:609)
        at java.io.DataInputStream.readUTF(DataInputStream.java:564)
        at org.apache.spark.rpc.netty.RequestMessage$.readRpcAddress(NettyRpcEnv.scala:585)
        at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:595)
[ERROR] org.apache.spark.util.SparkUncaughtExceptionHandler logError - Uncaught exception in thread Thread[elasticsearch[_client_][generic][T#3],5,main]
java.lang.NoSuchMethodError: io.netty.bootstrap.Bootstrap.config()Lio/netty/bootstrap/BootstrapConfig;
        at org.elasticsearch.transport.netty4.Netty4Transport.lambda$stopInternal$5(Netty4Transport.java:443)
        at org.apache.lucene.util.IOUtils.close(IOUtils.java:89)
        at org.elasticsearch.common.lease.Releasables.close(Releasables.java:36)
        at org.elasticsearch.common.lease.Releasables.close(Releasables.java:46)
        at org.elasticsearch.common.lease.Releasables.close(Releasables.java:51)
        at org.elasticsearch.transport.netty4.Netty4Transport.stopInternal(Netty4Transport.java:426)
        at org.elasticsearch.transport.TcpTransport.lambda$doStop$5(TcpTransport.java:959)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:569)
[ERROR] org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 0.0 failed 4 times, most recent failure: Lost task 8.3 in stage 0.0 (TID 37, 192.168.200.105, executor 5): java.lang.IllegalStateException: availableProcessors is already set to [8], rejecting [8]
        at io.netty.util.NettyRuntime$AvailableProcessorsHolder.setAvailableProcessors(NettyRuntime.java:51)
        at io.netty.util.NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
        at org.elasticsearch.transport.netty4.Netty4Utils.setAvailableProcessors(Netty4Utils.java:82)
        at org.elasticsearch.transport.netty4.Netty4Transport.(Netty4Transport.java:138)
        at org.elasticsearch.transport.Netty4Plugin.lambda$getTransports$0(Netty4Plugin.java:93)
        at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:174)
        at org.elasticsearch.client.transport.TransportClient.(TransportClient.java:265)
        at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:130)
        at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:116)
        at org.elasticsearch.transport.client.PreBuiltTransportClient.(PreBuiltTransportClient.java:106)
[ERROR] $ generatePlots - org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 21, 192.168.200.105, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)

Yeah I can't remember the exact issues we were seeing with the colliding Netty jars, but those BootstrapConfig error traces look really familiar to me.

Additionally, you might find the Rest client is easier to work with in some ways - Mostly in that it doesn't require lockstep versioning with ES.

Thanks! The RestHighLevelClient from Elasticsearch 6.2.3 version works like a charm. I rewrote the application to use this

class RestManager extends Serializable {
  val client: RestHighLevelClient  = new RestHighLevelClient(
    RestClient.builder(
      new HttpHost("node0", 9200, "http")))
}
object myRestClient extends RestManager

I do not see any BootstrapConfig errors or any other exceptions in the spark log.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.