Bulk queue_size

We are using ES 1.7 in production with 3 nodes. We were planning to increase the bulk.queue_size parameter to 100 instead of the default 50 and then I noticed this. Notice how the bulk.active, bulk.queue and bulk.rejected are all happening only on one node(hostname3) of the cluster. That doesnt look normal to me. How do I make those requests spread out across nodes?

curl 'http://localhost:9200/_cat/thread_pool?v'
host ip bulk.active bulk.queue bulk.rejected index.active index.queue index.rejected search.active search.queue search.rejected
hostname1 127.0.1.1 0 0 0 0 0 0 0 0 0
hostname2 127.0.1.1 0 0 0 0 0 0 1 0 0
hostname3 127.0.1.1 32 48 323947490 0 0 0 0 0 0

1 Like

Hi caddala,

How many indices do you have and how many shards in each index?

We have 3 indices, 5 primary shards and 2 replicas per index.

index1 -> 34560668 documents
index2 -> 179328171 documents
index3 -> 60

1 Like

What the bulk.rejected count is telling you is that the cluster doesn't have the resources for the volume of events you're trying to index at the speed you're feeding it in.

What disk configuration do you have? SSD and in any raid configuration?

And what are the specs of the hosts?

Also, what search volume are you doing on this cluster currently? queries per second,etc

Also note that only one node has an excessive rejected count, are you pointing the indexing workload at just one node or are you balancing the index work across all of them?

We are using SSD nodes with 125GB RAM.

The index load is supposed to be balanced across all nodes. I am using the java API to interact with Elasticsearch. I am instantiating the client as show below. I add all nodes to TransportClient and I do bulk operations as shown below. My expectation is that bulk request should be balanced across all the three nodes. What am I missing?

TransportClient tc = new TransportClient(settings);
for (String host : hosts) {
String parts = host.split(":");
String address = parts[0];
Integer port = Integer.parseInt(parts[1]);
tc.addTransportAddress(new InetSocketTransportAddress(address, port));
}
client = tc;

BulkRequestBuilder bulk = client.prepareBulk();
buld.add(....) //I call this serveral times
bulk.execute().actionGet();

1 Like

If you only add one node to your indexing client (not the node with errors, leave it out) how does your bulk indexing test run then?

What does curl 'http://localhost:9200/_cat/thread_pool?v' look like before/after with a single node configured in the client?

Also can you set your indexing client to full debug to look for more clues?

How much memory have you got configure for heap?

Oh and what's your document size?

There are also some great indexing performance tips to go through listed here:
https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-performance.html

Thanks

I finally got a chance to try this, If I remove one node, elastic search is picking another node to do all bulk requests.

I made an observation, which I wonder is the possible cause of this behavior. I had noticed that of the 3 nodes, that I have, all the primary shards of the index I am doing bulk insert to, are on one node (hostname3). Could this be a possible reason why I see bulk active, queue and rejected only on that node?

If that is the case, would I be able to move primary shards from hostname3 to other nodes to distribute bulk inserts? If I was able to do that, I am hoping each node has its own thread pool queue of 50 for bulk inserts and I would see less bulk rejected counts.

1 Like

If you can spread those shards out to other nodes, it will likely spread that load.

A quick and dirty way is to manually reroute one of the shards to another node, though elasticsearch might immediately balance it back:
https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-reroute.html

You can force the primaries to spread out by disabling replicas, letting things reallocate, then adding them back.

Let me know how the replica remove/re-add goes.

NOTE: Removing replicas reduces your high availability in the event you lose a node when there are no replicas, so you have to assess the risk that involves for your use case. Might be good to suspend updates, take a snapshot and then remove the replicas and re-add them, then resume updates to the cluster. This way if you do lose a node during this procedure, you can perform a restore back to a known good data set. But losing a node means an outage for your cluster. A partial outage for the missing data on the failed node(s) and possibly a full outage when performing the restore until all the shards are present and accounted for. Not processing updates for the time it takes to rebalance then recreate replicas might also be seen as an outage depending on your use case.

Thx for your help!

Here is what I did. I disabled the replicas using the command below which rebalanced the primary shards across nodes. It was pretty smooth.

curl -XPUT 'http://localhost:9200/my_index/_settings' -d '{
"number_of_replicas": 0
}'

I changed the number of replicas back to 2 after that and now both primary and replica shards are spread across nodes. Bulk insert traffic is now spread out and number of bulk rejections went down a lot.

curl 'http://localhost:9200/_cat/thread_pool?v'
host ip bulk.active bulk.queue bulk.rejected index.active index.queue index.rejected search.active search.queue search.rejected
hostname2 127.0.1.1 5 0 0 0 0 0 0 0 0
hostname1 127.0.1.1 2 0 0 0 0 0 0 0 0
hostname0 127.0.1.1 5 1 230 0 0 0 0 0 0

1 Like