Bulk indexing: Load balancing over bulk queue size?

Hi there,
we have a small cluster with 6 machines and heterogeneous count of cores on each machine. 1x16, 3x24 and 2x48 cores inclusive hyperthreading are available (194 in sum). Further, on some of the machines other jobs will be calculated from time to time, meaning that ES don't get 100% power of the machine during these times.
On node side, we configured as command line parameter -E processors=nproc -E thread_pool.bulk.size=nproc (nproc=>cpu count) to avoid the ES 32 cores limit on the big machines. Further, we set the thread_pool.bulk.queue_size to 300 which is ~1.5xthe number of cores in the cluster, which could hold all incoming concurrent bulk requests we configured in theory.

Client: We use the java transport client with bulk indexing. 1.5MB Bulksize (which results in ~20-35 docs per bulk). One document is hard for ES to process (several seconds), since we have a big geo_shape field with a linestring with up to 2000 GPS points on each to index). We configured 291 (1.5xthe number of cores) concurrent requests. The idea behind is that each machine has, in average, all cores running to process the bulk requests, and half of the core size in the queue, which is running fine on a single shard index scenario.
We tried this with all machines connected to the client, sniffing both on and of, and also with a single node connected, hoping that this node does the load balancing job. No big differences.

Now we have the situation that the bulk queues on the machines with less cores becomes very full (over 250 items), whereby the queues on the big machines are empty, and the machines are bored most of the time. It looks that all requests are just distributed equally to all the nodes (round robbin), independent whether a node can manage it or not. Since the insert threads on client side are waiting for their bulks to be finished, they are waiting for the slow machine queues, before they give a new item to a fast machine. If we would give more threads, the queue size of the slow machines would be exceeded, leading to an TransportException.

In our scenario, the load balancing solution could look simple: just give the next bulk to the node with the smallest current queue size. Would this be possible somehow, e.g. in the java client? Also giving to the machine with the smallest load would be fine.
Nevertheless we are wondering that we fall into this, because load balancing should be more or less resolved in ES, isn't it? And having heterogeneous machines in a cluster should be common also. So there should be a solution for this we just are not aware of.

thanks for reading - any solution?

We actually recommend clusters with homogenous machines. We also recommend not to run other applications on the same machine. You're hitting one issue here, and you might hit other ones if you chose to continue using this cluster architecture in the future.

The Java client only supports round-robin mode. You would have to write a custom request balancer.

Thanks for your answer! Can you give me an entry point how to write a custom request balancer? Is it on client or server side? Is there e.g. a java client method where I can replace the current round-robin balancer?

Unfortunately the code is not pluggable in that regard. I don't have a good solution for this. Let's see what others on the forum are going to suggest.

Is it possible to use the document routing for this? If I know that for a specific routing value A a specific shard SA is used, and I have six routing values for the 6 shards, paired with the information of the current bulk queue size?

One problem I see is that the routing value is per document, not per bulk - so the bulk is round-robined again, right? Maybe we skip using bulk indexing and use simple document indexing, which is not such worse in our case, where one doc has to be processed several seconds anyway?

For all who has the same problem: I was able to solve the load balancing problem with document routing. Though it is no ideal solution, it works now for our run. Here are the according steps:

  1. Trying to find a routing value for each shard: just implemented a loop invoking the ES hash function with the loop variable, % the number of shards: Murmur3HashFunction.hash(String.valueOf(i)) % 6. For six shards, I found these values (the first per shard, shardIndex=routingVal: {0=0, 1=56, 2=9, 3=11, 4=8, 5=46}.
  2. Determine the current size of the node bulk queues every n seconds, getting the host with the lowest queue size. Pick according shard number, pick according routing value.
  3. route every document to the low-queue shard with the picked routing value

Our queues are now filled evenly distributed, the low machines don't fall behind, the strong machines are not bored. Of course, the strong machines will need more disk memory because they process more docs, but this is not relevant for us. If you have a index with different shard count, you will have to calculate the according routing values again.

It would be quite easy for elastic to add such a load balancing, maybe as an alternative option. We think it is not uncommon for small and medium-sized companies that they have a pool of machines with different power, and other processes on these machines. For us as research institute, it is nearly impossible to get exclusive access to a big homogeneous cluster of machines. We can just grow up our department computing cluster over time, so bigger machines are added in future.

Instead of determining the current size of a node's bulk queue every n seconds, wouldn't it be simpler to collect

  • each data node address (for direct connection)
  • the maximum bulk queue size of each data node
  • the CPU processor count of each data node
  • shard distribution over the data nodes

once before bulk indexing starts, to compute weights per data node?

Having these weights, modifying the transport client source code from round-robin to weighted fair queueing (WFQ) should do it. Maybe something like a java.util.concurrent.PriorityBlockingQueue with a weight comparator.

In the HTTP/2 server world, WFQ is used as prioritized content serving to clients , e.g. https://netty.io/4.1/api/io/netty/handler/codec/http2/WeightedFairQueueByteDistributor.html

This would only work if you are the only one on the cluster machines. Since other processes produces load on single machines from time to time, we have to load balance dynamically.

The other processes, do they use ordinary client API, or would it be possible to use a modified client API?

The matter is, it should not be the client to decide about the load by polling the information by using an interval free of choice. This should be a server side decision. For example, the bulk responses in the current ES client API do not return the used and free bulk queue capacity (or other load-related information). So I think this information belongs to the bulk response. A modified client could decide which node to select for next dispatch of a bulk request. This assumes a "direct connection", meaning a connection is exclusively used for bulk requests that are addressed to shards on that node (no forwarding).

If an ordinary client connects, the node could throttle bulk responses if bulk requests are coming at a rate too fast, as a primitive method to protect itself against bulk queue rejections. But, as clients are not obliged to receive and evaluate bulk responses, this approach might be ineffective.

The other processes that produce load on the machines have nothing to do with elasticsearch - these are from colleauges of other projects in our department. Still, because of this additional (machine) load, the elasticsearch process have not so much throughput.
I make an explicit call to elasticsearch via '_nodes/stats/thread_pool' to determine the current queue filling levels. This could also work if several clients would do so, it's just a bit more 'intelligence' than the round robin the client currently do.
But you are perfectly right - it would be even nicer if the elasticsearch server would offer the load balancing.

I'm not sure if a throtteling would do, because the client threads are waiting for their bulks until they are finished. If throttled, they would wait longer, and are not free to put stuff on the stronger machines. If you enhance the number of client threads for this, you will get bulk rejections again at some point.

Yes, for ordinary clients it simply does not work. There are other murky details, for example, the bulk requests never timeout. Also, the transport client "exponential backoff mechanism" allows to retry bulk requests, but only if a "soft" exception was returned, and waits the longer the more errors are returned. For clients it is not transparent to find out if they submit erroneous requests, too many bulk requests for a specific node, or if the whole cluster is under pressure.

A modified client would have to choose between strategies, at least a fail-fast mode and throttled modes. Depending on how the client is connected to nodes and preparing bulk requests, my suggestion for a modified client would be

  • fast fail mode: if a bulk request times out or the cluster sends a bulk response with errors, immediately stop sending more bulk requests and put application to an error state. The application decides how to continue.
  • node throttle mode for direct connections: if one node times out or sends a bulk response with errors, wait for a while, and try again, but only on this node.
  • cluster throttle for load-balanced/intermediated connections: if a bulk request times out or the cluster sends a bulk response with errors, suspend the related nodes, wait for a while, and try again, preferably on other nodes. Do not communicate exceptions to the application unless there is no node left to communicate to. (This cluster-wide throttling mode is how the ordinary transport client should work - but there is no bulk request timeout)

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.