[Hadoop] Writing directly to shards in EsOutputFormat and shard awareness

Hi, I have 2 related questions regarding routing write requests. Thanks in
advance for answering!

Question 1:
I saw this line in the EsOutputFormat class and I was wondering why:
(https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java#L221)
(https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java#L239)

        Map<Shard, Node> targetShards = repository.getWriteTargetPrimaryShards();
        repository.close();


        List<Shard> orderedShards = new ArrayList<Shard>(targetShards.keySet());
        // make sure the order is strict
        Collections.sort(orderedShards);

        // if there's no task info, just pick a random bucket
        if (currentInstance <= 0) {
            currentInstance = new Random().nextInt(targetShards.size()) + 1;
        }
        int bucket = currentInstance % targetShards.size();
        Shard chosenShard = orderedShards.get(bucket);
        Node targetNode = targetShards.get(chosenShard);


        // override the global settings to communicate directly with the target node
        settings.setHosts(targetNode.getIpAddress()).setPort(targetNode.getHttpPort());
        repository = new RestRepository(settings);
        uri = SettingsUtils.nodes(settings).get(0);

I was trying to understand why this is being done. My understanding of ES
writes was that:

*Question 2:*Is there a way to hook into the routing logic of the cluster?

  • Instead of picking up documents in bulk, sending them to a node in ES
  • Which will then again resend the documents to the primaries for
    different documents
  • Is there a way to find out which node is the primary for a document
  • Collect all documents meant to go to a specific primary and send them
    directly to that node?
  • So, we're not talking simple clients but smart-client libraries.
    Perhaps I should be asking, does the Java client library already do this?
    (I didn't think so)
  • Overall, wouldn't this save a lot of data re-routing over the network?

Thanks.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/6996e823-0b41-42f8-b473-1afe0a8a4c1b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

n 1. Performance reasons
While a write request can be send to any node, which in turn will do
proxying, we can avoid this and only hit the primaries. This avoids the
proxying, rerouting. Note that each task that is writing is assigned a
different primary in a round-robin fashion - so effectively the write
happens in parallel across the primaries for the target index.

  1. What exactly are you trying to achieve? A map-reduce job ends up with
    multiple tasks hitting an ES cluster; if you pick only one node, you're
    likely to overwhelm it while the rest of the cluster will be idle. If you
    spread the load randomly, the nodes will re-route the calls the primaries
    first and then the replicas (depending on your settings).
    By talking directly to the primaries, you reduce unnecessary IO and CPU
    (caused by the proxying and hashing) and you get better through-put and
    negotiation between ES and Hadoop. If you consider again an imbalanced
    scenario: each node that you add for proxying can become a liability: maybe
    the node is busy with other activities, maybe it goes off-line, etc...

Note that es-hadoop does retries for both rejections (ES is busy) and
network failures (whether ephemeral or permanent, by switching to a
different node).

As for computing the hash for each document and guessing what exact primary
it will hit, there are certain challenges with that:
a. the client has to replicate the logic in ES (which can change across
versions, settings, etc..)
b. each hadoop task that writes ends up making multiple connections across
ES primaries. While this might work for small indices with a small number
of shards, for anything else this approach will be inefficient. You will
end up with significantly more connections that can (and will) fail.
c. the bulk itself will be divided into smaller bulks that reduce their
efficiency especially since the load itself it's not consistent.

es-hadoop is a client optimized for Hadoop environments. Emphasis 'client'.
it does not and should not - try to outsmart ES. We strive for excellent
performance but without sacrificing reliability. Once data is inside ES, it
gets all its goodies, from replication, sharding, etc... -

If you're worried about performance, which we always try to improve, I'd be
happy to look into using some concrete benchmarks.

Hope this helps,

On Sun, May 4, 2014 at 8:41 PM, Ashwin Jayaprakash <
ashwin.jayaprakash@gmail.com> wrote:

Hi, I have 2 related questions regarding routing write requests. Thanks in
advance for answering!

Question 1:
I saw this line in the EsOutputFormat class and I was wondering why:
(
https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java#L221
)
(
https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java#L239
)

        Map<Shard, Node> targetShards = repository.getWriteTargetPrimaryShards();
        repository.close();


        List<Shard> orderedShards = new ArrayList<Shard>(targetShards.keySet());
        // make sure the order is strict
        Collections.sort(orderedShards);

        // if there's no task info, just pick a random bucket
        if (currentInstance <= 0) {
            currentInstance = new Random().nextInt(targetShards.size()) + 1;
        }
        int bucket = currentInstance % targetShards.size();
        Shard chosenShard = orderedShards.get(bucket);
        Node targetNode = targetShards.get(chosenShard);


        // override the global settings to communicate directly with the target node
        settings.setHosts(targetNode.getIpAddress()).setPort(targetNode.getHttpPort());
        repository = new RestRepository(settings);
        uri = SettingsUtils.nodes(settings).get(0);

I was trying to understand why this is being done. My understanding of ES
writes was that:

  • You can send a write request to any node in ES
  • That node will receive the request, use the document id, hash it and
    based on which node is supposed to be the primary
  • The write will be re-routed to the node meant to be the primary for
    that document id/hashcode
  • If this is the case (like in any data grid like Hazelcast, Coherence
    or NoSQL stores Hbase, Cassandra etc), why the special logic to find the
    primaries?
  • Is this being done simply to load balance writes uniformly across
    the cluster? In that case why not do a simple round robin?

*Question 2:*Is there a way to hook into the routing logic of the cluster?

  • Instead of picking up documents in bulk, sending them to a node in ES
  • Which will then again resend the documents to the primaries for
    different documents
  • Is there a way to find out which node is the primary for a document
  • Collect all documents meant to go to a specific primary and send
    them directly to that node?
  • So, we're not talking simple clients but smart-client libraries.
    Perhaps I should be asking, does the Java client library already do this?
    (I didn't think so)
  • Overall, wouldn't this save a lot of data re-routing over the
    network?

Thanks.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/6996e823-0b41-42f8-b473-1afe0a8a4c1b%40googlegroups.comhttps://groups.google.com/d/msgid/elasticsearch/6996e823-0b41-42f8-b473-1afe0a8a4c1b%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAJogdmfZA-DErALEerKCKWCC1S7_tSwymuopZ7EOpA4J%3DfwH9Q%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

I'm not sure I understand this - "write request can be send to any node,
which in turn will do proxying, we can avoid this and only hit the
primaries. This avoids the proxying, rerouting
".

Even if you hit a "primary", ES will still have to re-route the document to
"the primary shard handling the hash of the doc" which could very well be
on a different node. Isn't this true? Like any other distributed system.
So, how do you save that extra hop as you claim "avoids the proxying,
rerouting
". What you described seems to directly conflict what is
described here unless I'm mistaken:

Elasticsearch Platform — Find real-time answers at scale | Elastic

Elasticsearch Platform — Find real-time answers at scale | Elastic

Regards.

On Sunday, May 4, 2014 11:19:30 AM UTC-7, Costin Leau wrote:

n 1. Performance reasons
While a write request can be send to any node, which in turn will do
proxying, we can avoid this and only hit the primaries. This avoids the
proxying, rerouting. Note that each task that is writing is assigned a
different primary in a round-robin fashion - so effectively the write
happens in parallel across the primaries for the target index.

  1. What exactly are you trying to achieve? A map-reduce job ends up with
    multiple tasks hitting an ES cluster; if you pick only one node, you're
    likely to overwhelm it while the rest of the cluster will be idle. If you
    spread the load randomly, the nodes will re-route the calls the primaries
    first and then the replicas (depending on your settings).
    By talking directly to the primaries, you reduce unnecessary IO and CPU
    (caused by the proxying and hashing) and you get better through-put and
    negotiation between ES and Hadoop. If you consider again an imbalanced
    scenario: each node that you add for proxying can become a liability: maybe
    the node is busy with other activities, maybe it goes off-line, etc...

Note that es-hadoop does retries for both rejections (ES is busy) and
network failures (whether ephemeral or permanent, by switching to a
different node).

As for computing the hash for each document and guessing what exact
primary it will hit, there are certain challenges with that:
a. the client has to replicate the logic in ES (which can change across
versions, settings, etc..)
b. each hadoop task that writes ends up making multiple connections across
ES primaries. While this might work for small indices with a small number
of shards, for anything else this approach will be inefficient. You will
end up with significantly more connections that can (and will) fail.
c. the bulk itself will be divided into smaller bulks that reduce their
efficiency especially since the load itself it's not consistent.

es-hadoop is a client optimized for Hadoop environments. Emphasis
'client'. it does not and should not - try to outsmart ES. We strive for
excellent performance but without sacrificing reliability. Once data is
inside ES, it gets all its goodies, from replication, sharding, etc... -

If you're worried about performance, which we always try to improve, I'd
be happy to look into using some concrete benchmarks.

Hope this helps,

On Sun, May 4, 2014 at 8:41 PM, Ashwin Jayaprakash <ashwin.ja...@gmail.com<javascript:>

wrote:

Hi, I have 2 related questions regarding routing write requests. Thanks
in advance for answering!

Question 1:
I saw this line in the EsOutputFormat class and I was wondering why:
(
https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java#L221
)
(
https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java#L239
)

        Map<Shard, Node> targetShards = repository.getWriteTargetPrimaryShards();

        repository.close();


        List<Shard> orderedShards = new ArrayList<Shard>(targetShards.keySet());

        // make sure the order is strict
        Collections.sort(orderedShards);

        // if there's no task info, just pick a random bucket
        if (currentInstance <= 0) {

            currentInstance = new Random().nextInt(targetShards.size()) + 1;

        }
        int bucket = currentInstance % targetShards.size();

        Shard chosenShard = orderedShards.get(bucket);

        Node targetNode = targetShards.get(chosenShard);


        // override the global settings to communicate directly with the target node
        settings.setHosts(targetNode.getIpAddress()).setPort(targetNode.getHttpPort());

        repository = new RestRepository(settings);

        uri = SettingsUtils.nodes(settings).get(0);

I was trying to understand why this is being done. My understanding of ES
writes was that:

*Question 2:*Is there a way to hook into the routing logic of the
cluster?

  • Instead of picking up documents in bulk, sending them to a node in
    ES
  • Which will then again resend the documents to the primaries for
    different documents
  • Is there a way to find out which node is the primary for a document
  • Collect all documents meant to go to a specific primary and send
    them directly to that node?
  • So, we're not talking simple clients but smart-client libraries.
    Perhaps I should be asking, does the Java client library already do this?
    (I didn't think so)
  • Overall, wouldn't this save a lot of data re-routing over the
    network?

Thanks.

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/6996e823-0b41-42f8-b473-1afe0a8a4c1b%40googlegroups.comhttps://groups.google.com/d/msgid/elasticsearch/6996e823-0b41-42f8-b473-1afe0a8a4c1b%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/680f38b0-b2c4-4e22-b3c2-46343882a988%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

To reiterate on my previous email; by talking directly to primaries, assuming a uniform distribution of documents, we have
at least an 1/x of the total documents sent (where X is the number of shards) without proxying, while the rest will be
proxied
to the respective shards (why we don't compute the target shard directly is part of the previous email).
Talking to a non-primary node would guarantee proxying for all of the documents being sent.

On 5/6/14 6:15 AM, Ashwin Jayaprakash wrote:

I'm not sure I understand this - "/write request can be send to any node, which in turn will do proxying, we can avoid
this and only hit the primaries. This avoids the proxying, rerouting/".

Even if you hit a "primary", ES will still have to re-route the document to "the primary shard handling the hash of the
doc" which could very well be on a different node. Isn't this true? Like any other distributed system. So, how do you
save that extra hop as you claim "/avoids the proxying, rerouting/". What you described seems to directly conflict what
is described here unless I'm mistaken:

Regards.

On Sunday, May 4, 2014 11:19:30 AM UTC-7, Costin Leau wrote:

n 1. Performance reasons
While a write request can be send to any node, which in turn will do proxying, we can avoid this and only hit the
primaries. This avoids the proxying, rerouting. Note that each task that is writing is assigned a different primary
in a round-robin fashion - so effectively the write happens in parallel across the primaries for the target index.

2. What exactly are you trying to achieve? A map-reduce job ends up with multiple tasks hitting an ES cluster; if
you pick only one node, you're likely to overwhelm it while the rest of the cluster will be idle. If you spread the
load randomly, the nodes will re-route the calls the primaries first and then the replicas (depending on your settings).
By talking directly to the primaries, you reduce unnecessary IO and CPU (caused by the proxying and hashing) and you
get better through-put and negotiation between ES and Hadoop. If you consider again an imbalanced scenario: each
node that you add for proxying can become a liability: maybe the node is busy with other activities, maybe it goes
off-line, etc...

Note that es-hadoop does retries for both rejections (ES is busy) and network failures (whether ephemeral or
permanent, by switching to a different node).

As for computing the hash for each document and guessing what exact primary it will hit, there are certain
challenges with that:
a. the client has to replicate the logic in ES (which can change across versions, settings, etc..)
b. each hadoop task that writes ends up making multiple connections across ES primaries. While this _might_ work for
small indices with a small number of shards, for anything else this approach will be inefficient. You will end up
with significantly more connections that can (and will) fail.
c. the bulk itself will be divided into smaller bulks that reduce their efficiency especially since the load itself
it's not consistent.

es-hadoop is a client optimized for Hadoop environments. Emphasis 'client'. it does not and should not - try to
outsmart ES. We strive for excellent performance but without sacrificing reliability. Once data is inside ES, it
gets all its goodies, from replication, sharding, etc... -

If you're worried about performance, which we always try to improve, I'd be happy to look into using some concrete
benchmarks.

Hope this helps,

On Sun, May 4, 2014 at 8:41 PM, Ashwin Jayaprakash <ashwin.ja...@gmail.com <javascript:>> wrote:

    Hi, I have 2 related questions regarding routing write requests. Thanks in advance for answering!

    *Question 1:*
    I saw this line in the EsOutputFormat class and I was wondering why:
    (https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java#L221
    <https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java#L221>)
    (https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java#L239
    <https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java#L239>)


    |
    Map<Shard,Node>targetShards =repository.getWriteTargetPrimaryShards();
                 repository.close();


    List<Shard>orderedShards =newArrayList<Shard>(targetShards.keySet());
    // make sure the order is strict
    Collections.sort(orderedShards);

    // if there's no task info, just pick a random bucket
    if(currentInstance <=0){
                     currentInstance =newRandom().nextInt(targetShards.size())+1;
    }
    intbucket =currentInstance %targetShards.size();
    ShardchosenShard =orderedShards.get(bucket);
    NodetargetNode =targetShards.get(chosenShard);


    // override the global settings to communicate directly with the target node
                 settings.setHosts(targetNode.getIpAddress()).setPort(targetNode.getHttpPort());
                 repository =newRestRepository(settings);
                 uri =SettingsUtils.nodes(settings).get(0);
    |


    I was trying to understand why this is being done. My understanding of ES writes was that:

      * You can send a write request to any node in ES
      * That node will receive the request, use the document id, hash it and based on which node is supposed to be
        the primary
      * The write will be re-routed to the node meant to be the primary for that document id/hashcode
      * If this is the case (like in any data grid like Hazelcast, Coherence or NoSQL stores Hbase, Cassandra etc),
        why the special logic to find the primaries?
          o (http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/distrib-write.html
            <http://www.google.com/url?q=http%3A%2F%2Fwww.elasticsearch.org%2Fguide%2Fen%2Felasticsearch%2Fguide%2Fcurrent%2Fdistrib-write.html&sa=D&sntz=1&usg=AFQjCNGf82A2ZbTAwRJodDFpSH9WrTGN0Q>)
      * This primary may not even be the one that will handle the write for my document
      * Is this being done simply to load balance writes uniformly across the cluster? In that case why not do a
        simple round robin?
          o (https://github.com/elasticsearch/elasticsearch-hadoop/edit/1.3/docs/src/reference/asciidoc/core/arch.adoc
            <https://github.com/elasticsearch/elasticsearch-hadoop/edit/1.3/docs/src/reference/asciidoc/core/arch.adoc>)


    *Question 2:
    *Is there a way to hook into the routing logic of the cluster?

      * Instead of picking up documents in bulk, sending them to a node in ES
      * Which will then again resend the documents to the primaries for different documents
      * Is there a way to find out which node is the primary for a document
      * Collect all documents meant to go to a specific primary and send them directly to that node?
          o (Like Hazelcast or Coherence's Partition awareness:
            http://www.hazelcast.org/docs/3.0/manual/html-single/#DataAffinity
            <http://www.hazelcast.org/docs/3.0/manual/html-single/#DataAffinity>)
      * The problem with this might be that if the primary dies, then the sender has to be aware of the topology
        change and then resend the data to the new primary
      * So, we're not talking simple clients but smart-client libraries. Perhaps I should be asking, does the Java
        client library already do this? (I didn't think so)
      * Overall, wouldn't this save a lot of data re-routing over the network?


    Thanks.


    --
    You received this message because you are subscribed to the Google Groups "elasticsearch" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to
    elasticsearc...@googlegroups.com <javascript:>.
    To view this discussion on the web visit
    https://groups.google.com/d/msgid/elasticsearch/6996e823-0b41-42f8-b473-1afe0a8a4c1b%40googlegroups.com
    <https://groups.google.com/d/msgid/elasticsearch/6996e823-0b41-42f8-b473-1afe0a8a4c1b%40googlegroups.com?utm_medium=email&utm_source=footer>.
    For more options, visit https://groups.google.com/d/optout <https://groups.google.com/d/optout>.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/680f38b0-b2c4-4e22-b3c2-46343882a988%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/680f38b0-b2c4-4e22-b3c2-46343882a988%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/53688A41.5070903%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Yeah - I figured as much.

Thanks.

On Tuesday, May 6, 2014 12:07:45 AM UTC-7, Costin Leau wrote:

To reiterate on my previous email; by talking directly to primaries,
assuming a uniform distribution of documents, we have
at least an 1/x of the total documents sent (where X is the number of
shards) without proxying, while the rest will be
proxied
to the respective shards (why we don't compute the target shard directly
is part of the previous email).
Talking to a non-primary node would guarantee proxying for all of the
documents being sent.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/bc38ea3f-cbf3-48a3-9763-5e9ff295b0b3%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.