How do I set routing option with elasticsearch-hadoop plugin in storm?

This thread is about routing based on the document field. From the first posts:

You hijacked the thread to talk about routing but based on a fixed value (defined per bolt as I understand correctly).
Which is a valid use case except you do not respond to my questions on how you want to use it, what is the user experience but rather what you think happens and how you plan to fix it. Which is fine except again, is not what I asked.
Latest example:

Either way, I think I found out a possible optimization to be done in ES-Hadoop to use for reads/writes only the relevant shards in case of a fixed / constant routing. I've raised an issue here:

You hijacked the thread to talk about routing but based on a fixed value (defined per bolt as I understand correctly).

I did not realize that.
I thought my content was in line with the title of the post.

Anyway, I am happy that a correct issue has been filed.
Thank you Costin for doing that! Appreciate your help.

@ted.fed I've double checked and ES-Hadoop works fine with ES 2.x including routing. That's because ES-Hadoop doesn't rely on ES to extract its routing from the fields but rather extracts it itself from each document (of course, a constant can be provided).

Cheers,

Thanks @costin
Appreciate your effort in pushing a testcase for this !

@costin, I saw the second issue you filed in this conversation.
I want to share what we plan to do at this point and add a suggestion/wish.

If es.mapping.routing requires a field-name in the document to be used as a routing value, then it seems that the bulk request will include a routing-value per bulk-item (reference link, look for routing).

So if the routing-per-bulk-item statement is true and we want to make all the documents in one bulk-request be indexed on one ES-shard itself, then we need to make sure that all documents in one bulk-request have the same routing-key.

This can be done by specifying a message-transformer class in the "es.ser.writer.value.class" option.
The class specified with this option should extend the org.elasticsearch.hadoop.serialization.builder.JdkValueWriter class and override its write() method to add our routing-field defined by "es.mapping.routing".
Something like this:

public class EsBoltMsgSerializer extends JdkValueWriter
{
    @Override
    public Result write(Object value, Generator generator)
    {
        Map<String, Object> fieldMap = convertTupleToMap((Tuple)value);
        fieldMap.put("routing_field_of_doc", getUniqueRoutingValueForCurrentThread());
        return super.write(fieldMap, generator);
    }

    // Tuple does not allow updating value of a key, so we create a hash-map from it
    HashMap convertTupleToMap(Tuple tuple)
    {
        Map<String, Object> fieldMap = new HashMap<String, Object>();
        for (String field: tuple.getFields())
        {
            Object fieldValue = tuple.getValueByField(field);
            fieldMap.put(field, fieldValue);
        }
        return fieldMap;
    }
}

This strategy seems to be working for us for the time-being but I am still concerned by one extra hop that happens from receiving ES-primary-shard to the routing-value's-target-primary-shard. This extra hop happens because the routing value specified by the above method is not directed to the receiving-primary-shard. It's just a value different from all other EsBolts.

If we can call the /index-name/_search_shards API and get a mapping between shard-number and host-name, then we can avoid this extra hop by specifying an exact routing value which will match the receiving-ES-primary-shard for each bolt. I think the best place to put this logic is inside the bolt itself and provide a user-friendly-option for this such as "es.index.all.docs.on.receiving.shard"

Avoiding this extra hop should be of great help in improving the performance of EsBolt and reduce network congestion among the nodes.

I disagree. When the routing is per item, connecting directly to the target shard becomes impossible and it's best to leave ES handle this stuff.

Assume you have 1 bolt that writes to an index with 5 shards. Since the routing is dynamic there is no way for the bolt to know where a document will go before seeing the actual document.
So the document comes in, the bolt extracts the field and needs to compute the hash. So it call search_shards with the routing, gets the shard, makes a dedicated connection to it (despite the fact that right now there's already a connection pinned to one of the shards) and writes the doc.
For the second document, the same criteria applies.
Now assume that the docs are not written right away but rather put in bulk. This means for every shard the bolt would keep a separate bulk queue and have a dedicated connection for that respective bulk.

So instead of "getting all documents, computing the routing and sending them in one bulk", the bolt would "for every document do a search_shard, have S bulk queues and S connections, where S is the number of shards".
Have 2 bolts? double the number of calls to the cluster. And all that because of an extra hop.
If you are trying to limit the number of connections, moving that inside the client is really not the solution.
Further more, by splitting the docs into separate bulks one ends up with slightly different semantics in terms of how the documents are written - it shouldn't really affect the end result but I suspect there might be some corner cases where this has a change in behaviour.

Further more, generalizing this behaviour to the rest of the libraries where the number of partitions writing is determined by the source (aka Spark/Hive/MR, etc...) means there can be dozens of so-called bolts, each trying to hash things themselves and opening up S connections.

These being said, if there are certain API hooks that would facilitate, in your case, the usage of ES-Hadoop let me know.