Large and collocated Spark/ES cluster, concerns about data transfert

Hi all,

I want to use spark to query ES. But I've a doubt about how data is transfered back and forth between ES nodes and Spark nodes on a large cluster setup and how node affinity between both world works.
We are setting up a large cluster of machine where every node will run both a spark instance and an ES instance.

When we launch a big query from spark, is the es-hadoop connector able to understand the topology (shards distribution) of the both clusters?
Our most important concern here is to avoid a query from the Spark cluster to the ES cluster to first require the fetching of the whole ES dataset to a single node before that data being redistributed to the very same machines for Spark processing.

For that, we need to be sure that a spark job doing a simple-yet-big query (no aggregation !) will see the Spark RDD distribution on the cluster to be driven by the way ES has distributed the corresponding indexes on the cluster.
In other words, every Spark node should receive the partial data that is collocated on the same machine in the ES node. This way, the ES data is transformed into a Spark RDD without needing anything to be transfered on the network.
Every ES Node should know about and communicate with its collocated Spark Node (we calls this, perhapd wrongly, "node affinity" between both worlds)

Can you confirm that such a mechanism exist ? And if that is the case where can we find information about what are the constraints and conditions to make it work ?

We have an understanding that such an "affinity" system exist between ES and Hadoop, for instance when using Hive to query ES data.

Hello @rzrucher!

When launching a job using Spark, the connector determines the locations of the shards in Elasticsearch that it will be targetting and creates a partition per shard (or if you are running on v5.0.0 it will subdivide the shards further to allow for greater parallelism). Each of these partition definitions carries with it the index name, the shard id, the slice id (v5.0.0) and the addresses of the machines where it can find this data on locally. It then relies on Spark's task scheduling to achieve data locality.

Spark will stand up a task for each of the input partitions, and each reading task is pinned to a node that is hosting the shards. This just means that the task will always try to read from that node first, but will target other nodes if that node fails. When reading data, the task only reads data in the shard that it's accessing by using the preference feature on searching. This allows us to skip the coordination step for the query phase and thusly, skip the need to funnel results to a single node for combining. Data is shipped directly back to Spark.

In this case ES is not talking to Spark, only responding to requests. But yes, we provide the data that Spark needs to determine where tasks should be scheduled for data locality, and each task that starts up is pre-programmed by the input partition definition to have an affinity for the node the data is stored on.

These features are all core design decisions and don't require any special configuration to enable. You can find information about the project architecture here, and a general feature set here. The entire collection of documentation pages is regularly updated as new features and fixes are applied, and is a great resource to explore.

The Spark integration actually uses almost all of the same code under the hood that the rest of the integrations use. SparkSQL is even more special because of the amount of extension points it offers to connector developers. One of the abilities this allows us to do is to push down filter operations to the Elasticsearch level. This saves on data transmission costs to Spark by filtering the results on the Elasticsearch side before returning them for further processing.

I hope that clears up some of your concerns!

1 Like