How do I set routing option with elasticsearch-hadoop plugin in storm?

I have gone through
and through

But none of these specify if one can use the routing option when the above is used as a bolt in storm.

Does anyone know how to do that?


See the Configuration section. All the fields specified there can be used across all the integrations.

Thank you Costin.

I see that the only option available there is:
es.mapping.routing (default none)
The document field/property name containing the document routing. To specify a constant, use the format.

But it seems that this options is deprecated in ES 2.0+ versions as mentioned in :
WARNING: extracting custom routing from the document is no longer supported; it was removed in Elasticsearch v2.0. The ability to extract routing from the document itself can lead to strange edge-cases and potential trouble (e.g. if the field is multi-valued, which value do you use for routing?) If you are <v2.0, it is not recommended to use extraction.

So if that is true, is there any other way to do this in ES-Hadoop?

Appreciate your help !

So if that is true, is there any other way to do this in ES-Hadoop?

It is true. That functionality is for ES 1.X and likely will have to be removed going forward, at least for metadata, at least in its current form.
I've raised an issue for it here.

The metadata based routing was great IMO (i.e. specifying a field in the document itself on which routing-algorithm would calculate the hash to find out which shard they would go to).

Probably ES should not have removed the feature to begin with.

If multi-valued fields' hashing was the only concern (as mentioned in their documentation of removing this feature), they could just have disallowed multi-valued fields from being specified as a routing field.

Removing features is never easy. Moving metadata in the URL as oppose to bulk request param has the benefit of dispatching the request without having to parse it.

So then there is no way ES-Hadoop can do custom routing while working with 2.x version of ES?

Currently, for ES 2.x this is an issue that needs resolving.

Hey Costin,

Is there a JIRA issue or a bug that we can track this feature somewhere?

Thank you,

Thank you Costin,

Do you know if the cluster.routing.operation.hash.type option still supported by ES?

Can I use this to write a plugin that will contain the routing logic as used in ?

For my use case, I will make the plugin's hash-function return an integer representing the current shard always.

This has been deprecated in 2.x and removed in master as mentioned here.
It is an internal setting that can easily mess up your cluster with the wrong hash or if the cluster grow.

I'm not sure what you are trying to achieve, instead of hacking ES to mimic the shard based on routing, it would be easier to ask/hack/improve ES-Hadoop to do what you want it to do.

Note that while routing is useful, it can easily be abused and one can end up with hotspots which leads to all kind of hard to diagnose problems.

Our use case is: Kafka -> Storm -> ES

If we specify a routing parameter (by patching ES-Hadoop), even then there could be one network hop from the shard receiving the batch request to the primary shard which has been chosen by hash(routing-field)%numShards.

We want to save this network hop too.

Currently each node receives 10 MB / request and distributes it out to all the other 10 primary shards.
With routing in ES-Hadoop, each node will receive 10 MB / request and forward it another primary shard.
With routing plugin in ES, each node will receive 10 MB / request and consume it there itself.

With 10 Nodes and 10 MB/min traffic, we will save around 100 MB/min traffic which is not groundbreaking but not trivial as well.

If that plugin was available in 2.x, we would not have needed any patching of ES or ES-Hadoop's core codebase, just a plugin and that would have been ideal.

But now, we need to see on which side we can patch routing.

When you mention routing - does that apply to your entire dataset or is it per entry?

I meant routing per batch request.

200,000 documents/minute to index, 10 ES-primary-shards and 20 ES-bolts.

Currently, each ES-bolt gets 10k documents and sends to one of the primary-shards.
The receiving primary-shard distributes those 10k documents to all the other primary-shards making it roughly 1000 docs/primary-shard.

Also, each primary-shard receives 1 indexing call from the client and 10 more indexing calls from other ES-primary shards.
Due to this the thread pool of each primary-shard fills in very quickly and we have to limit the number of bolts that can send data to ES-cluster.

If routing option was available, every primary-shard would not distribute its batch to other primary-shards.
And so there would be lesser http-calls, more threads available / shard for indexing and bigger bulk-request / shard (without routing, the size of bulk request is also reducing to one-tenth for the final primary-shards and that is not good).

Please let me know if the above makes sense or we are doing something wrong.

But how would you define the routing? Once per bolt or inside the bolt every time for each request?
Is it dynamic or based on based on the content? I ask since it has a direct impact on how to configure ES-Hadoop.

Currently, this is what happens in ES-Bolt as per my understanding.

EsBolt.preapre() -> RestService.createWriter() -> RestService.initSingleIndex() -> RestRepository.getWriteTargetPrimaryShards() -> RestRepository.doGetWriteTargetPrimaryShards()

This last one calls the API: curl -XGET 'http://hostname:9200/storm/_search_shards'
This REST call returns all the nodes in the cluster along with their master/replica status and this function then filters out the non-primary-shards from that data.

I plan to tap somewhere in the above call hierarchy and figure out a routing value for each primary-shard.
I will use the same Murmur3HashFunction as used internally in ES.
Something like this:

String getRoutingValueForShard (int shardId, int numShards) {
    Murmur3HashFunction function = new Murmur3HashFunction();
    for (int i= 0 ; i < 100000; i++) {
        String hashKey = "routing_value"+i+"";
        int hashId =  function.hash( hashKey );
        int bucketId = MathUtils.mod( hashId , numShards) ;
        if (bucketId == shardId)
            return hashKey;
    return null;

So the above function will give me a mapping of the routing-value to use per IP address.
Once I have that, I will append the same to each HTTP call I make from each bolt.
This will make sure the http request lands with just the right routing-value on each primary-shard.

I know this solution suffers from some problems like ES cluster rebalance etc. but those can be handled correctly once the basic routing as per the above logic is working.

Also worth mentioning is that without routing, the scalability of ES-cluster is affected a lot.
As the number of primary-shards increase, the number of http requests increase too.

Basically, without routing, n*n extra http requests are made for a cluster with n primary shards.

With routing, there will be 0 extra http requests.

@Sachin We're talking past each other.
You keep talking about why routing is important to you and how you plan to fix it which is not answering my questions.
It looks like you simply want to the calls to go only to your primary shards regardless of your document - at least that's what your code above does.
Which completely ignores the document being used or the routing key which is what this thread is about.
So which one is it?

As for your conclusion on the impact of scalability - the whole point of adding nodes to a cluster is to spread the load across these nodes.
I'm not sure how you ended up with n*n extra http requests - ES-Hadoop only writes to primary shards. And in fact, the number of connections within a job is done by the number of write instances which is created upstream (in Hadoop/Spark/Storm).

Don't get me wrong, if you want to patch the code and if that fixes your problem, by all means go ahead and do it.
However I'm trying to understand your use case better so maybe it can be incorporated upstream.

I am sorry if there was some confusion but I have always been talking about the same thing:

"Route documents to a particular shard and index/consume them on that very primary-shard.
Different bolts will route to different primaries and all those primary shards will consume/index the documents locally.
Such kind of routing works for us because we will never search by ID"


Which completely ignores the document being used or the routing key which is what this thread is about.
So which one is it?

I do not understand this. If you can elaborate a little more on that, I would be happy to clarify.


I'm not sure how you ended up with n*n extra http requests - ES-Hadoop only writes to primary shards. And in fact, the number of connections within a job is done by the number of write instances which is created upstream (in Hadoop/Spark/Storm).

As far as I know, n*n requests should happen within the ES-cluster without routing.
Please see the example I have provided in above comment.

Basically, without routing, each ES-primary contacts all other ES-primaries (assume no replicas for simplicity) for every bulk-indexing request it receives. Hence n*n extra requests.
And with routing, no primary-shard contacts other primary-shards to distribute its bulk-indexing request. Hence 0 http extra requests.

Another downside with routing is that if a primary-shard is slow or rejecting requests for whatever reason, all the other primary-shards sending partial-indexing-requests to it will report failures. Due to this, the retry logic would be triggered in all the clients/bolts.
If routing was there, only the bolts feeding a slow primary would get failures and only those would trigger retry logic. (This becomes more pronounced when the number of clients sending indexing requests is high, as storm does).