[Hadoop] Routing keys, _id's and multi nodes in elasticsearch-hadoop

Hey Costin, great work on elasticsearch-hadoop. The cascading tap is nice
laid out and very easy to use as a sink (haven't tried reading yet).

  1. Can we get support for routing keys when using the Tap as a sink? I know
    it can be tricky to get that right and co-ordinate with the buffered bulk
    rest client and actually get performance benefits of it. Maybe we can do
    that by having a predetermined set of routing keys and programmatically
    create Taps for each one (if only we have routing key support in Tap).

public Map<Pipe, Tap> createSinks(Pipe pipe, String resource,
List routes) {
Map<Pipe, Tap> sinksByRoutingKey = new HashMap<>(routes.size());
for (String routingKey : routes) {
Pipe filteredPipe = new Each(pipe, new Fields("routingField"), new
RoutingKeyFilter(routingKey));
ESTap tapForRoutingKey = new ESTap(resource, routingKey, new
Fields("x", "y", "z"));
sinksByRoutingKey.put(filteredPipe, tapForRoutingKey);
}
return sinksByRoutingKey;
}

  1. How would I set _id field using this tap? I tried to use mapping to
    extract the _id field from a doc field but it didn't seem to be using it as
    it always ended up as string instead of the number field I wanted to use as
    the _id. I understand the type of the _id field is string and can't change,
    but I wanted to use the value of a number field in _id (just as a numerical
    string of course). I don't see any errors in ES or in es-hadoop job when it
    runs. It just seems to blindly ignore it and just use the standard
    generated ids. I'm not sure where the problem is.

"mappings": {
"my_type": {
"properties": {
"_id" : {
"type": "string",
"path": "my_id"
},
"my_id": {
"type": "long",
"store": "no",
"index": "analyzed",
"include_in_all": false
}
}
}
}

Changing the type of my_id to string (and "index" to "not analyxed") did
not help either

  1. I'm creating different indexes for each day of data (so using the date
    based data flow as Shay talks about) and when doing a full index, this ends
    up building about 60 days worth of data (60 indexes in parallel). There's a
    flow for each day index and a corresponding ESTap. I would like to hit a
    different node is ES cluster from each of these taps. Since Properties (for
    local mode) or JobConf (for hadoop mode) only allows specifying one
    es.host, they all end up going to the same host. Or does it actually sniff
    out the other hosts and use them too? Looking at the cluster it seems like
    only some of the nodes are getting traffic. Am I doing something wrong?

  2. Is it possible to split the es-hadoop library into its constituent
    parts. I'm only using Cascasding and not Hadoop directly or Hive or Pig.
    Having those dependencies brought in is creating a dependency management
    nightmare.

Thanks,
Peter

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Hi,

  1. the next milestone release (we're working on pushing the current
    codebase as 1.3.0.M1) will focus on data id which will facilitate doing
    things like routing. Ideally I'd like to support routing directly at the
    M/R level just through configuration without any additional code and
    potentially better express that through the various higher abstractions
    (like Cascading). That is, have routing with minimal to no change in the
    code.
    That's the plan at least. I like your suggestion about the map and multiple
    taps. It might be that the routing could be added transparently and thus
    end up with only one Tap.

  2. Right - this is planned for the next development iteration (see 1).
    Currently the data is just appended/inserted - as we don't acknowledge the
    id there's no update yet. But this will change shortly.

  3. Parallel/distributed writing is a 'tricky' subject and the next release
    should address most of the demands here. By tricky I mean ES handles
    routing the data internally depending on sharding, replicas and user
    defined routing. That is even if you send data to node X, some of it might
    be very well routed to node Y. For large volumes of data, folks are
    interested in writing to multiple nodes at once however from the tests that
    I've done (not extensive though), that's not really a problem unless the
    network gets saturated which happens quite rare only when there are enough
    Hadoop nodes and splits writing at the same time.
    These being said, we'd like to introduce the option of specifying multiple
    nodes for fail-over purposes (if one node dies, we can simply talk to the
    next one and so on) and potentially extend that to support parallel writes.
    However this will not just work out of the box; it still depends on whether
    the source is splitable and whether there are more then one reducer.

Bottom-line is: parallel writing depends not just on ES, will not work all
the time and doesn't have the impact (if any) that most people assume on
performance in Hadoop environments.

  1. Yes it is, which means having 5 jars: -all, -cascading, -mr (on which
    all the others depend), -pig, -hive. The jars can either depend on -mr or
    all be self contained. However I've postponed the decision since with yarn
    (which requires recompilation and thus a slightly different bytecode),
    we'll double that - meaning we end up with 10 jars which seems extreme to
    me :slight_smile:

These being said, can you please raise an issue on routing 1) and split
jars 4)? For 2) and 3) we already have issues raised.
Thanks for the kind words and the feedback - this is great!

Cheers,

On Sat, Sep 14, 2013 at 7:43 PM, peter@vagaband.co peter@vagaband.cowrote:

Hey Costin, great work on elasticsearch-hadoop. The cascading tap is nice
laid out and very easy to use as a sink (haven't tried reading yet).

  1. Can we get support for routing keys when using the Tap as a sink? I
    know it can be tricky to get that right and co-ordinate with the buffered
    bulk rest client and actually get performance benefits of it. Maybe we can
    do that by having a predetermined set of routing keys and programmatically
    create Taps for each one (if only we have routing key support in Tap).

public Map<Pipe, Tap> createSinks(Pipe pipe, String resource,
List routes) {
Map<Pipe, Tap> sinksByRoutingKey = new HashMap<>(routes.size());
for (String routingKey : routes) {
Pipe filteredPipe = new Each(pipe, new Fields("routingField"), new
RoutingKeyFilter(routingKey));
ESTap tapForRoutingKey = new ESTap(resource, routingKey, new
Fields("x", "y", "z"));
sinksByRoutingKey.put(filteredPipe, tapForRoutingKey);
}
return sinksByRoutingKey;
}

  1. How would I set _id field using this tap? I tried to use mapping to
    extract the _id field from a doc field but it didn't seem to be using it as
    it always ended up as string instead of the number field I wanted to use as
    the _id. I understand the type of the _id field is string and can't change,
    but I wanted to use the value of a number field in _id (just as a numerical
    string of course). I don't see any errors in ES or in es-hadoop job when it
    runs. It just seems to blindly ignore it and just use the standard
    generated ids. I'm not sure where the problem is.

"mappings": {
"my_type": {
"properties": {
"_id" : {
"type": "string",
"path": "my_id"
},
"my_id": {
"type": "long",
"store": "no",
"index": "analyzed",
"include_in_all": false
}
}
}
}

Changing the type of my_id to string (and "index" to "not analyxed") did
not help either

  1. I'm creating different indexes for each day of data (so using the date
    based data flow as Shay talks about) and when doing a full index, this ends
    up building about 60 days worth of data (60 indexes in parallel). There's a
    flow for each day index and a corresponding ESTap. I would like to hit a
    different node is ES cluster from each of these taps. Since Properties (for
    local mode) or JobConf (for hadoop mode) only allows specifying one
    es.host, they all end up going to the same host. Or does it actually sniff
    out the other hosts and use them too? Looking at the cluster it seems like
    only some of the nodes are getting traffic. Am I doing something wrong?

  2. Is it possible to split the es-hadoop library into its constituent
    parts. I'm only using Cascasding and not Hadoop directly or Hive or Pig.
    Having those dependencies brought in is creating a dependency management
    nightmare.

Thanks,
Peter

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Good to hear that things are already planned for some of this and thanks
for the quick reply.

  1. I like your approach to support routing keys better than mine, mine
    might be an easier implementation, but probably not the preferable one.

  2. So if I understand correctly, ES only uses _id's extracted from other
    doc fields on "create" (which makes sense), but we're not doing "create"'s
    with ESTap, but rather "appends" so that field is ignored. What API
    operation does "append" actually map to? Just curious. Glad to see it's
    getting addressed.

  3. So in this case I'm building 60 different indexes with 0 replicas, 1
    shard (this is different index from the one I need routing keys as
    mentioned in #1). I only need 1 shard as it's by date and I can increase
    the number of replicas after the index has been built completely and made
    searchable. I have noticed that in our QA environment this process takes
    close to 2 hours but in production it takes a matter of minutes. Both QA
    and production have identical ES nodes in terms of config and resource
    allocation. Only difference is the number of nodes. QA has 2 while
    production has 16. Same amount of data being indexed. I assume QA is slower
    because only 2 nodes are being utilized while production is using way more
    nodes. But during indexing in production, we only see few of the 16 nodes
    being used. I'd like to better utilize them and spread out the load across
    all 16 of them.

I think you have much grander plan for distributed writing than I have... I
just looked and I think what I want is covered by this ->

  1. I see your predicament with Yarn :frowning: We'll need to get creative I guess
    to avoid unnecessary number of jars.

I created two issues in Github as you requested.

Thanks again,
Peter

On Saturday, September 14, 2013 1:28:42 PM UTC-4, Costin Leau wrote:

Hi,

  1. the next milestone release (we're working on pushing the current
    codebase as 1.3.0.M1) will focus on data id which will facilitate doing
    things like routing. Ideally I'd like to support routing directly at the
    M/R level just through configuration without any additional code and
    potentially better express that through the various higher abstractions
    (like Cascading). That is, have routing with minimal to no change in the
    code.
    That's the plan at least. I like your suggestion about the map and
    multiple taps. It might be that the routing could be added transparently
    and thus end up with only one Tap.

  2. Right - this is planned for the next development iteration (see 1).
    Currently the data is just appended/inserted - as we don't acknowledge the
    id there's no update yet. But this will change shortly.

  3. Parallel/distributed writing is a 'tricky' subject and the next release
    should address most of the demands here. By tricky I mean ES handles
    routing the data internally depending on sharding, replicas and user
    defined routing. That is even if you send data to node X, some of it might
    be very well routed to node Y. For large volumes of data, folks are
    interested in writing to multiple nodes at once however from the tests that
    I've done (not extensive though), that's not really a problem unless the
    network gets saturated which happens quite rare only when there are enough
    Hadoop nodes and splits writing at the same time.
    These being said, we'd like to introduce the option of specifying multiple
    nodes for fail-over purposes (if one node dies, we can simply talk to the
    next one and so on) and potentially extend that to support parallel writes.
    However this will not just work out of the box; it still depends on whether
    the source is splitable and whether there are more then one reducer.

Bottom-line is: parallel writing depends not just on ES, will not work all
the time and doesn't have the impact (if any) that most people assume on
performance in Hadoop environments.

  1. Yes it is, which means having 5 jars: -all, -cascading, -mr (on which
    all the others depend), -pig, -hive. The jars can either depend on -mr or
    all be self contained. However I've postponed the decision since with yarn
    (which requires recompilation and thus a slightly different bytecode),
    we'll double that - meaning we end up with 10 jars which seems extreme to
    me :slight_smile:

These being said, can you please raise an issue on routing 1) and split
jars 4)? For 2) and 3) we already have issues raised.
Thanks for the kind words and the feedback - this is great!

Cheers,

On Sat, Sep 14, 2013 at 7:43 PM, pe...@vagaband.co <javascript:> <
pe...@vagaband.co <javascript:>> wrote:

Hey Costin, great work on elasticsearch-hadoop. The cascading tap is nice
laid out and very easy to use as a sink (haven't tried reading yet).

  1. Can we get support for routing keys when using the Tap as a sink? I
    know it can be tricky to get that right and co-ordinate with the buffered
    bulk rest client and actually get performance benefits of it. Maybe we can
    do that by having a predetermined set of routing keys and programmatically
    create Taps for each one (if only we have routing key support in Tap).

public Map<Pipe, Tap> createSinks(Pipe pipe, String resource,
List routes) {
Map<Pipe, Tap> sinksByRoutingKey = new HashMap<>(routes.size());
for (String routingKey : routes) {
Pipe filteredPipe = new Each(pipe, new Fields("routingField"), new
RoutingKeyFilter(routingKey));
ESTap tapForRoutingKey = new ESTap(resource, routingKey, new
Fields("x", "y", "z"));
sinksByRoutingKey.put(filteredPipe, tapForRoutingKey);
}
return sinksByRoutingKey;
}

  1. How would I set _id field using this tap? I tried to use mapping to
    extract the _id field from a doc field but it didn't seem to be using it as
    it always ended up as string instead of the number field I wanted to use as
    the _id. I understand the type of the _id field is string and can't change,
    but I wanted to use the value of a number field in _id (just as a numerical
    string of course). I don't see any errors in ES or in es-hadoop job when it
    runs. It just seems to blindly ignore it and just use the standard
    generated ids. I'm not sure where the problem is.

"mappings": {
"my_type": {
"properties": {
"_id" : {
"type": "string",
"path": "my_id"
},
"my_id": {
"type": "long",
"store": "no",
"index": "analyzed",
"include_in_all": false
}
}
}
}

Changing the type of my_id to string (and "index" to "not analyxed")
did not help either

  1. I'm creating different indexes for each day of data (so using the date
    based data flow as Shay talks about) and when doing a full index, this ends
    up building about 60 days worth of data (60 indexes in parallel). There's a
    flow for each day index and a corresponding ESTap. I would like to hit a
    different node is ES cluster from each of these taps. Since Properties (for
    local mode) or JobConf (for hadoop mode) only allows specifying one
    es.host, they all end up going to the same host. Or does it actually sniff
    out the other hosts and use them too? Looking at the cluster it seems like
    only some of the nodes are getting traffic. Am I doing something wrong?

  2. Is it possible to split the es-hadoop library into its constituent
    parts. I'm only using Cascasding and not Hadoop directly or Hive or Pig.
    Having those dependencies brought in is creating a dependency management
    nightmare.

Thanks,
Peter

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearc...@googlegroups.com <javascript:>.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

  1. It's not ES per se, but es-hadoop which simply does not add any id information which translates the REST call into an
    insert (under [index]/[type]).
    The "update"/"upsert" would use the same endpoint but a different payload.

On 14/09/2013 9:58 PM, peter@vagaband.co wrote:

Good to hear that things are already planned for some of this and thanks for the quick reply.

  1. I like your approach to support routing keys better than mine, mine might be an easier implementation, but probably
    not the preferable one.

  2. So if I understand correctly, ES only uses _id's extracted from other doc fields on "create" (which makes sense), but
    we're not doing "create"'s with ESTap, but rather "appends" so that field is ignored. What API operation does "append"
    actually map to? Just curious. Glad to see it's getting addressed.

  3. So in this case I'm building 60 different indexes with 0 replicas, 1 shard (this is different index from the one I
    need routing keys as mentioned in #1). I only need 1 shard as it's by date and I can increase the number of replicas
    after the index has been built completely and made searchable. I have noticed that in our QA environment this process
    takes close to 2 hours but in production it takes a matter of minutes. Both QA and production have identical ES nodes in
    terms of config and resource allocation. Only difference is the number of nodes. QA has 2 while production has 16. Same
    amount of data being indexed. I assume QA is slower because only 2 nodes are being utilized while production is using
    way more nodes. But during indexing in production, we only see few of the 16 nodes being used. I'd like to better
    utilize them and spread out the load across all 16 of them.

I think you have much grander plan for distributed writing than I have... I just looked and I think what I want is
covered by this ->
allow multiple hosts (for resilience/load balancing) in es.resource · Issue #74 · elastic/elasticsearch-hadoop · GitHub

How do you write date ES ? Are you using Hadoop at all or not? I ask since this it typically a problem here - how many
tasks are running at once since using a small number simply doesn't yield the same performance due to the
batch-capabilities of Hadoop (that is, things are slower).

  1. I see your predicament with Yarn :frowning: We'll need to get creative I guess to avoid unnecessary number of jars.

I don't think there are too many options here. The ideal solution would be for YARN to be backwards compatible - hence
no different binaries - but that's a different story.

I created two issues in Github as you requested.

Thanks!

Thanks again,
Peter

On Saturday, September 14, 2013 1:28:42 PM UTC-4, Costin Leau wrote:

Hi,

1. the next milestone release (we're working on pushing the current codebase as 1.3.0.M1) will focus on data id
which will facilitate doing things like routing. Ideally I'd like to support routing directly at the M/R level just
through configuration without any additional code and potentially better express that through the various higher
abstractions (like Cascading). That is, have routing with minimal to no change in the code.
That's the plan at least. I like your suggestion about the map and multiple taps. It might be that the routing could
be added transparently and thus end up with only one Tap.

2. Right - this is planned for the next development iteration (see 1). Currently the data is just appended/inserted
- as we don't acknowledge the id there's no update yet. But this will change shortly.

3. Parallel/distributed writing is a 'tricky' subject and the next release should address most of the demands here.
By tricky I mean ES handles routing the data internally depending on sharding, replicas and user defined routing.
That is even if you send data to node X, some of it might be very well routed to node Y. For large volumes of data,
folks are interested in writing to multiple nodes at once however from the tests that I've done (not extensive
though), that's not really a problem unless the network gets saturated which happens quite rare only when there are
enough Hadoop nodes and splits writing at the same time.
These being said, we'd like to introduce the option of specifying multiple nodes for fail-over purposes (if one node
dies, we can simply talk to the next one and so on) and potentially extend that to support parallel writes. However
this will not just work out of the box; it still depends on whether the source is splitable and whether there are
more then one reducer.

Bottom-line is: parallel writing depends not just on ES, will not work all the time and doesn't have the impact (if
any) that most people assume on performance in Hadoop environments.

4. Yes it is, which means having 5 jars: -all, -cascading, -mr (on which all the others depend), -pig, -hive. The
jars can either depend on -mr or all be self contained. However I've postponed the decision since with yarn (which
requires recompilation and thus a slightly different bytecode), we'll double that - meaning we end up with 10 jars
which seems extreme to me :)

These being said, can you please raise an issue on routing 1) and split jars 4)? For 2) and 3) we already have
issues raised.
Thanks for the kind words and the feedback - this is great!

Cheers,



On Sat, Sep 14, 2013 at 7:43 PM, pe...@vagaband.co <javascript:> <pe...@vagaband.co <javascript:>> wrote:

    Hey Costin, great work on elasticsearch-hadoop. The cascading tap is nice laid out and very easy to use as a
    sink (haven't tried reading yet).

    1. Can we get support for routing keys when using the Tap as a sink? I know it can be tricky to get that right
    and co-ordinate with the buffered bulk rest client and actually get performance benefits of it. Maybe we can do
    that by having a predetermined set of routing keys and programmatically create Taps for each one (if only we
    have routing key support in Tap).

           public Map<Pipe, Tap> createSinks(Pipe pipe, String resource, List<String> routes) {
             Map<Pipe, Tap> sinksByRoutingKey = new HashMap<>(routes.size());
             for (String routingKey : routes) {
               Pipe filteredPipe = new Each(pipe, new Fields("routingField"), new RoutingKeyFilter(routingKey));
               ESTap tapForRoutingKey = new ESTap(resource, routingKey, new Fields("x", "y", "z"));
               sinksByRoutingKey.put(filteredPipe, tapForRoutingKey);
             }
             return sinksByRoutingKey;
           }


    2. How would I set _id field using this tap? I tried to use mapping to extract the _id field from a doc field
    but it didn't seem to be using it as it always ended up as string instead of the number field I wanted to use as
    the _id. I understand the type of the _id field is string and can't change, but I wanted to use the value of a
    number field in _id (just as a numerical string of course). I don't see any errors in ES or in es-hadoop job
    when it runs. It just seems to blindly ignore it and just use the standard generated ids. I'm not sure where the
    problem is.

           "mappings": {
             "my_type": {
               "properties": {
                 "_id" : {
                   "type": "string",
                   "path": "my_id"
                 },
                 "my_id": {
                   "type": "long",
                   "store": "no",
                   "index": "analyzed",
                   "include_in_all": false
                 }
               }
             }
           }


       Changing the type of my_id to string (and "index" to "not analyxed") did not help either

    3. I'm creating different indexes for each day of data (so using the date based data flow as Shay talks about)
    and when doing a full index, this ends up building about 60 days worth of data (60 indexes in parallel). There's
    a flow for each day index and a corresponding ESTap. I would like to hit a different node is ES cluster from
    each of these taps. Since Properties (for local mode) or JobConf (for hadoop mode) only allows specifying one
    es.host, they all end up going to the same host. Or does it actually sniff out the other hosts and use them too?
    Looking at the cluster it seems like only some of the nodes are getting traffic. Am I doing something wrong?

    4. Is it possible to split the es-hadoop library into its constituent parts. I'm only using Cascasding and not
    Hadoop directly or Hive or Pig. Having those dependencies brought in is creating a dependency management nightmare.

    Thanks,
    Peter

    --
    You received this message because you are subscribed to the Google Groups "elasticsearch" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to
    elasticsearc...@googlegroups.com <javascript:>.
    For more options, visit https://groups.google.com/groups/opt_out <https://groups.google.com/groups/opt_out>.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.