How to check/wait until all outstanding bulk (index) operations are complete?

Is there some way to wait for all outstanding indexing operations to complete, across the entire index? the refresh=wait_for parameter will only wait for the documents which are modified in this query, and only on the shards written to by this query...I need to wait for all writes index-wide.

Here's what I'm doing. I'm syncing a large data set, it takes many hours. I take a timestamp before I start my sync...then I spin through all the records from the primary store inserting them into elasticsearch writing each record with this timestamp. On completion I issue a delete_by_query to delete any records where the timestamp is less than the timestamp for this sync (to remove any old records which are no longer present in the primary store). This delete_by_query is very often failing with conflicts. I could issue all of my bulk requests with refresh=wait_for, but this makes the whole operation take several hours longer...I'd rather not do that. If I just issue the final call with refresh=wait_for it only applies to records (and shards) modified in that call specifically. Is there any way I can just wait for all outstanding writes, index-wide, before issuing my delete_by_query?

It's possible simply retrying the delete_by_query until it succeeds would actually do what I want...but what happens if the delete_by_query executes and succeeds on a document which has an outstanding index request? Will the index request re-insert the document? I kinda doubt it...at the very least it seems like a sketchy behavior to be relying on to guarantee consistent data. Any insight on this is greatly appreciated :slight_smile:

My suggestion would be to not use the approach of the delete by query.

As you're batch-synching the primary store to Elasticsearch, I would suggest to:
0. Create an alias pointing to the index with the data you have now

  1. Write the data to a dedicated index named with the timestamp of the job sync counter or the day it was started. To go faster, you can even disable the refresh interval and re enable it only at the end of the indexing
  2. When the indexing ends, you switch the alias to point to the new index and you can delete the old index in one delete index call

The clients need to use the alias as entry point for the queries.

That is a good idea, however it won't work for me because this batch sync is not the only thing in the index - there are a whole lot of other documents in the index from other sources which do not get touched by this process, and it would not be practical to rebuild the entire index every time one of these runs. These batch-syncs will likely run a couple times a day, but to rebuild the whole index would take a few days...

Thank you for the additional details.

You could put the "static documents" in one index, the dynamic ones in another and make the alias point to all those indices and change the pointers dynamically.

I feel massive deletes by query are suboptimal in your case.

Actually an alias is not mandatory, as you can query a list of indices (e.g. myindex1,myindex1,anotherindex)

When initially planning this we considered separate indexes by "data set", but it was significantly higher than the recommended number of indexes in a cluster (high hundreds to low thousands). Not all of the "data sets" are that large, but they change over time and there's no real way to predict which ones will grow that large ahead of time, so segmenting them into multiple shared indexes would be quite complex.

It's probably worth mentioning that deletes are rare - it's quite uncommon that a document will be deleted as a result of the sync, but nevertheless I do have to handle it when it does happen, it seems very inefficient to be fully regenerating indexes on sync when >99% of the time the sync is insert or update only...the only reason it's trying to delete records regularly right now is because the bulk call which updates the timestamp is queued up in elasticsearch and hasn't been processed yet. At this point I'm kinda leaning towards just having a time delay where I sleep for, say, 15 seconds then issue the delete_by_query

With Index Lifecycle Management you can automatically rotate an index you're writing to depending on the size or the number of documents. An alternative is to use the rollover API.

Ok the workflow is clear now and I understand.

I am tempted to say to do the following.
Instead of actually deleting the indices, set a special field to mark an index as deleted an use optimistic lock versioning to ensure the "latest version" is kept
Then, use a filtered alias when searching, where the filter includes a query to exclude the soft deleted documents or include the filter in all the queries.

Ooooh, that sounds promising....I have no idea what optimistic lock versioning is, so I'll need to read up on exactly how that works...but thanks, this seems like this could work :slight_smile:

@redec versioning should work if you can provide a "version" to each document you're indexing https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-versioning

Ok, so just making sure I understand. Once my bulk update completes I would query for all documents where the timestamp is less than the current sync...then for each of those I would issue an update to set the soft delete flag (with version=whatever) to ensure I'm not "deleting" anything which has been processed in the meantime. Then as the indexing backlog clears I may have "deleted" some of those docs already, but the index operation would overwrite them back to be not-deleted. Is that the general idea?

Sorry @redec, maybe I am misunderstanding the problem.

You are bulk indexing documents in batch composed of several bulk requests.
When doing this, you timestamp the documents.

Once the batch indexing is completed you start a delete by query to delete documents having a timestamp corresponding to previous runs of the indexer.

No, the refresh at index level and it commits the latest segments to disk and opens them for search.

You can force a refresh once you send the last bulk request of a batch with POST indexname/_refresh.

Once you do the refresh, the delete by query should run without issues.

Also: indexing is real time. Direct access by id is real time. Search is near real time: it searches only on segments open for search, meaning those which have been hit by a refresh (explicit or implicit).

Ooooooh, ok....so a POST to _refresh will wait for the bulk input buffers on all shards/nodes to flush? Sorry, I'm pretty new to elasticsearch, the internals are a little fuzzy to me....I'll give that a shot, thanks! :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.