Spark - querying ElasticSearch cluster over a RDD

Hello.

I am trying to query an ElasticSearch cluster for each record in a RDD i.e. a 10 million record RDD map over a function where it queries a remote ElasticSearch cluster over the transport client.

I've tried using both map and mapPartition (to ensure only one transport client is created for each partition). But I still get "No node available" exception thrown. My Spark config is as follows:

  • 1 machine - I have tried using both 4g and 8g driver memory
  • num-executor=4

Note: when num-executor=1 then it works fine, but then I guess Its no different to running it over one machine than over Spark.... So Ideally I would like to execute this with num-executor > 1.

Has anyone tried doing this before? Any suggestions is to solve this issue will be greatly appreciated. Thanks!

Without any type of configuration or logging information, this is just a guessing game. Please see the aforementioned page and provide the suggested information to help diagnostics.

Everything's pretty out of the box/vanilla.
Using EMR-4.0.0
Hadoop distribution:Amazon 2.6.0
Applications:Spark 1.4.1
*Spark cluster - 1 node, driver memory 4g. Input textfile has 20 partitions.

  • Elastic search - 1 node, for simplicity sake I've put this onto the same box as the spark cluster.

In my spark job I do something of the like

// this version (only using map) works when spark job has just 1 executor-core
val dataset = sc.textFile(....)
val results = dataset.map(i => {
    val settings = ImmutableSettings.settingBuilder()
              .put("cluster.name", ....)
              .put("client.transport.ping_timeout", 60)
              .put("client.transport.nodes_sampler_interval", 60)
              .build()
   val esClient = new TransportClient(settings).addTransportAddresses(new InetSocketTransportAddress(...., 9300)
    val result = s"hello world $i" + // append result to querying elastic search cluster using esClient
    esClient.close()
    result
}


// this version (using mappartition and map) does not work at all
val dataset = sc.textFile(....)
val results = dataset.mapPartition(i => {
    val settings = ImmutableSettings.settingBuilder()
              .put("cluster.name", ....)
              .put("client.transport.ping_timeout", 60)
              .put("client.transport.nodes_sampler_interval", 60)
              .build()
   val esClient = new TransportClient(settings).addTransportAddresses(new InetSocketTransportAddress(...., 9300)
    val result = i.map (j => { ....})
    esClient.close()
    result
}

I figured out what's wrong, I cannot use the same transport client to prepare multiple queries, is there an option to set to allow this? Thanks.

@bug, I've formatted your post to make it readable. Please do so yourself in the future.

You are using the transport client yourself - likely the Spark nodes that actually execute the nodes cannot properly connect to the Elasticsearch cluster or the ES cluster is significantly smaller and due to load (too many Spark tasks) is overloaded and thus the JVM is GC'ed and becomes unresponsive (which makes it look like it is not there any more).

Do note that ES-Hadoop takes care of all this and relies on the REST interface. In your case, you are not using ES-Hadoop but rather doing things yourself.