We are indexing around 7TB on a daily basis.
All the indices are being replaced once a day with a new ones (fresh data).
Each index represent one customer (business).
The variety of indices is big, from a few kilobytes index up to a 500 gigabytes index.
After being indexed, no more writes to an index.
It will be used for search only until a new index will be ready (in the upcoming day) and replace it.
We are using alias per index (when a new index is ready, we flip alias and delete the old one).
The "new"/"ready" indices are relocated into a dedicated nodes group (we call it "data-srv").
We have 4 nodes (64GB, 8 cores, 2.5TB) in this nodes group.
The last step before we replacing an index with a fresh one is changing the replication factor from zero to one. There could be dozens of indices that tries to add replica in the same time.
Recently we noticed that a lot of shards become unassigned during that time. Trying to analyze we see those UNASSIGNED state with REPLICA_ADDED as a reason.
Seems like the number of initializing shards and the number of relocating shards are limited, but we didn't figure out how this limitation works.
We assumed that there's a limitation / configuration and we tried to tweak those values:

indices.recovery.max_bytes_per_sec: 150mb. // default 40mb
cluster.routing.allocation.node_concurrent_recoveries: 16  // default 2

The intuition behind that was that by changing those we can increase parallelism/concurrency.
Then, we saw the following logs:

Unable to acquire permit to use snapshot files during recovery, this recovery will recover index files ...

We reverted this change (after reading this) and now we are waiting too much time for replicas.

  1. Any idea / suggestion how to increase the parallelism without getting those warning logs?
  2. Does it worth "marking" the indices as read-only somehow? will it improve performance somehow? make operations faster?
  1. The throttling is deliberate and desirable. Creating replicas is a lot of work, and if you do more of them at once then it may actually increase the total time it takes to create them all. But you can reasonably increase indices.recovery.max_bytes_per_sec as long as your hardware can cope and the extra bandwidth usage doesn't adversely affect any other operations.

  2. Yes it's worth marking read-only indices as such. That won't itself speed up the recovery process, but it may help to ensure that the indices have reached a state in which recovery can be as efficient as possible.

1 Like

Also it looks like you're using snapshot-based recoveries, so it would be good to take a snapshot before adding replicas.

I am not doing such thing (not intentionally), so I guess I don't fully understand what's happening.
I'll try to ellaborate more about the process, maybe it will clear some things.
serving nodes are defined with:

node.attr.zone: SERVER

and indexing nodes:

node.attr.zone: WORKER

Each index is being created with routing.allocation.include.zone=WORKER.
After indexing, we perform a forcemerge to 1 segment and relocate the index's shards by changing the settings to routing.allocation.include.zone=SERVER.
Then we change the replication factor from zero to one and flip the alias as a final step.
We do some additional request during that process to the tasks API (to make sure forcemerge is done), to the cat shards API (to make sure that relocation is done) and to the cluster health API (to make sure the replication is done).

Is any of those operation related to snapshot somehow?


After finishing indexing, I would use the index blocks API to make them read-only. Then flush. Then force-merge. Don't rely on the tasks API to detect when force merge is complete, wait for a success response from the force merge API. If you get a timeout then retry when there are no more running tasks (a no-op force merge is fast). Then take a snapshot. Then adjust the allocation settings and add replicas at the same time, no need to do them in sequence.

Thanks. We'll do homework and update.

We already do the block writes :slight_smile:

    settings={"index.blocks.write": True}, index=index_name
  • Can you explain (out of curiosity) why should we do another flush?
  • BTW we are not doing an explicit flush / refresh during bulks in the indexing part, not sure if that's ok. Will love to get your feedback on that too.
  • Still not sure about the snapshot. Are we talking about snapshots to S3? why do I need that / how does it help?
  • changing the replicas and allocation on the same time - interesting; will it be faster that way? isn't a stable / active primary shards (green) is requirement for adding another replica? how can relocating and replication can occur together?

Not using the index blocks API tho. Probably doesn't make much difference without replicas, but still I recommend using the blocks API.

No sense in keeping the translog around once you've finished indexing.

Makes sense, one at the end is sufficient.

Yes (or something like S3 anyway). Recovery (including relocation and adding replicas) can use the data in S3 rather than copying data between nodes, which saves data-transfer costs and reduces load on the cluster

It's the same process under the hood. Elasticsearch knows how to do these things correctly and in parallel.

1 Like

We switched the call to:

await es_client.indices.add_block(
    index=index_name, block="read_only"

but then it fails (later on) while trying to change the settings for relocating/replicating shards:

        settings={"index.routing.allocation.include.zone": "SERVER", "index.number_of_replicas": 1},


elasticsearch.AuthorizationException: AuthorizationException(403, 'cluster_block_exception', 'index [my_index_name] blocked by: [FORBIDDEN/5/index read-only (api)];')

Should I change it to block="read_only" instead? should I do another call with block="read_only" as a final step?

Previously you were using a write block. You probably don't want to change that?

I did it because of this error I guess (long time ago).
Is there's a benefit of blocking index to read_only compared to write? Will it save some elasticsearch work under the hood later on? I can change it to read_only after flipping alias..

No, I wouldn't expect that to make any difference to performance.

1 Like

What's wrong with the tasks API? (LGTM)
What's the recommended way to know when there are no more running tasks? (Other than the tasks API)
no-op == block?

The tasks API will tell you if the force-merge is no longer running, but will not tell you whether it succeeded or not. For that, you need a response from the force-merge API itself.

{'completed': False, 'task': {'node': 'd0cYkQjfTwuiwLhaahGYnA', 'id': 6490758, 'type': 'transport', 'action': 'indices:admin/forcemerge', 'description': 'Force-merge indices [9876115_23-06-21_06-41-53], maxSegments[1], onlyExpungeDeletes[false], flush[true]', 'start_time_in_millis': 1687348486083, 'running_time_in_nanos': 590414547695, 'cancellable': False, 'headers': {}}} [post_indexing_tasks.py:75]
{'completed': True, 'task': {'node': 'd0cYkQjfTwuiwLhaahGYnA', 'id': 6490758, 'type': 'transport', 'action': 'indices:admin/forcemerge', 'description': 'Force-merge indices [9876115_23-06-21_06-41-53], maxSegments[1], onlyExpungeDeletes[false], flush[true]', 'start_time_in_millis': 1687348486083, 'running_time_in_nanos': 593078267964, 'cancellable': False, 'headers': {}}, 'response': {'_shards': {'total': 1, 'successful': 1, 'failed': 0}}} [post_indexing_tasks.py:75]

Here are last two calls to the tasks API, last call indicates that operation completed. Do you mean that completed doesn't necessarily considered as a succeed?

The thing is that my Python that sends those operation is running on spots and can be potentially break in the middle.
Using the tasks API today, I can find a running forcemerge task for a specific index and waits for it to finish (if my spot was replaced).
A big index (500GB) can take a lot of time, compared to a small (few mb) one.
BTW, is there a formula to understand how many of those forcemerge tasks can run in parallel?
In addition, is this operation do something with coordinating nodes? I see that it works much faster when I do that via a dedicated coordinating nodes (compared to direct call to data+coordinating nodes without having a dedicated nodes).

Last thing, thank you thank you thank you! I super appreciate your help! :slight_smile:

It's not the index size that matters, it's the size of the merge. If there's no merges to do then you'll get a response quickly even on a 500GiB index. Hence why I think it's a good idea to retry just to check that there's no more work to do.

I don't think there's a limit.

I don't think so - at least all the work happens on the data nodes.

1 Like

so the "limit" we are facing (UNASSIGNED shards) are due to replication?
What are the node_concurrent_recoveries (default to 2) limit? 2 shards per node? per cluster? is it about adding replicas? relocating shards?

node_concurrent_recoveries is the number of concurrent recoveries per node, where "recovery" means anything which creates a shard on the node (which includes relocations and creating new replicas).

This is rarely the limiting factor and I don't recommend changing it from the default of 2, because that's normally enough to saturate your IO bandwidth.

Assuming that improve my IO bandwidth (I'm using AWS EBS gp3 and I can provision more IOPS/throughput) - Can I raise it a bit? 3-4? We know that Elastic configuration (the one we are talking about) is my bottleneck now due to: UNASSIGNED state after REPLICA_ADDED

Normally if you have more IO bandwidth it's best just to raise indices.recovery.max_bytes_per_sec.

1 Like

Maybe that's part of the issue?
I wonder why while heavily indexing, currently in:
(but it's only half way)

We see this (primary shards are being relocated by Elasticsearch):

source shard stats:

"type": "PEER",
  "stage": "TRANSLOG",

destination shard stats:

   "state": "RELOCATING",
   "primary": true,

I do see indexing slowlogs (some takes more than 30s per bulk of 1,000 docs)