Let's suppose we have a spark cluster with elasticsearch on each node.
We've created a rdd with documents somehow and want to write them to es.
Will the documents be written locally, i.e. if a partition is on node1, all documents from it will be sent to node1? And no documents will be transferred between hosts?
Reading the code (RestService):
//currentSplit is partitionId
// check invalid splits (applicable when running in non-MR environments) - in this case fall back to Random..
int selectedNode = (currentSplit < 0) ? new Random().nextInt(nodes.size()) : currentSplit % nodes.size();
This is just a round robin, and the data will be tossed around in network, If I understand this correctly.
That is correct. Most write patterns do not end up with all data in a request going to a single node, and it's impossible for us to determine which shard a document will be hashed to at write time without sending the data to the cluster and letting it determine it. Even if an Elasticsearch node is local to the write process, the chances of that node needing to send data along the network is based on how many shards you have and how many nodes they're spread across.
As such, for write operations, we try to filter down the nodes that we write to so that they only include nodes that are hosting shards for the index we're writing. Since there's no way to know which node the data will end up on, we round robin the partitions across the target nodes to spread the load evenly. If we were to prefer local node writes, it would upset the balance of traffic to the target nodes, and thus can cause slowdowns in processing.
We could potentially allow a property to be set that skips the filtering process and prefers talking to a local node for all writes, allowing the local node to manage all the traffic control of data. Could you describe your current set up and whether it might take advantage of this?
A cluster where each node has 1 ES instance running. Spark launched in standalone mode, 1 worker per node. Spark-master is located on one of the nodes.
Same, but cluster has 2 instances of ES on each node.
One use case for me is reading documents from kafka and writing them into ES. Kafka servers are also located one per node. So, with this feature on, all writes will be local. Currently round-robining produce a network load.
What additional information could I provide you about my setup?
The number of shards is greater than the number of of nodes, no filtering is enabled, so I think it is safe to assume that each node has at least one shard for each index.
The whole cluster is homogeneous, i.e. all nodes are almost indistinguishable.
I'm not using dynamic resource is es-hadoop, but I have a feature-request for a related issue: https://github.com/elastic/elasticsearch-hadoop/issues/991
Currently I create a rest-client for each partition and perform bulk requests for that.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.