EsRejectedExecutionException during bulk indexing due to scan and scroll

Hey guys,

I'm having some issues with bulk indexing and scan and scroll.

Some background:
I have an Elasticsearch index with around 80 million documents.

My bulk indexing setup consists of 20 "processor" services, each of which load data from a mysql database and fire bulk index requests to a 2 node Elasticsearch cluster.
Each of these services uses a BulkProcessor with:

  • a bulk size of 10MB
  • concurrent requests set to 2
  • bulk actions set to -1

So far this gave me the bulk indexing throughput I needed. With a 2 node cluster(16GB each) ~80 million documents took around 20 mins.

Each document I'm storing in ES is associated with a specific user accountId of my web application. So I have an accountId field in each document. I don't have a separate index for each user because then I end up with 2000+ indexes and types which leads to cluster state issues.

Ok so far everything is good. I have the capability of reindexing each users documents independently. Recently I added an extra step to my processor service logic, and this step was to delete all existing documents in Elasticsearch for that user. So the processor logic now looks like:

  1. Grab userId from pending list
  2. Delete all existing documents in ES where account_id = "userId" using scan and scroll + bulk delete calls IF any documents exist.
  3. Reindex everything.

Now if I have 2000 users, and I kick off the reindex process, these 2000 users' documents will be processed in parallel by 20 services each of which is using the above logic.

When I do this I get
EsRejectedExecutionException[rejected execution (queue capacity 100) on$AsyncShardOperationAction$1@117b1fdc]

According to my understanding, this exception means that there are way too many concurrent requests for bulk indexing. But WHY has this started to happen after I added the "scan and scroll + bulk delete step"

Also, If I delete my entire index and start reindexing from scratch I still run into the same issue. If there is no index, there are no documents and therefore the scan and scroll shouldn't find anything anyway so there should be no bulk delete requests, but I still run into the same problem.

If I remove that step, everything goes back to normal.

So my question is:
Is there an inherent overhead in using a scan and scroll query which causes it to use up the thread.bulk.queue?

One solution could be to just increate the thread.bulk.queue.size to 300 or 500 or whatever, but I really want to understand this.

Please let me know if you need more information, any help would be very very much appreciated!

Note: I know that I don't need to delete everything from an index to reindex i.e reindex will override the existing documents anyway. The reason I need to explicitly delete everything for a user first is because sometimes my ES dataset ends up having more documents than I have in mysql for that customers. This is just to cleanup up any ghost documents which were not deleted via the deletion event, its a long story, I can elaborate if needed

I've done this kind of thing before using a parallel walker rather than
bulk delete and reindex, simply because I don't want to have blips. Your
strategy should work, but there are issues with it.

So scrolling forcing elasticsearch to hold open a lucene reader, keeping a
consistent view of the data. This is usually not a big deal. Actually all
searches do it too between the search and fetch phases, but scrolling holds
it open longer: until the timeout, you delete the scroll, or you scroll off
the end.

It's hard to tell what is happening from your story though. It could be
that you are holding open enough scrolls to gum up things. It could be that
indexes are getting slow because you've filled the index with deleted
documents which take a long time to merge out. Though that seems less
likely if it doesn't happen on an empty index. So I wonder if you changed
something else? Maybe the scrolls aren't being closed? Are you seeing any
logs about merge throttling?

In JDBC importer I have also the challenge of stale documents. My opinion is to avoid scan/scroll deletes at all costs. Instead, recreate indices from scratch without replica, and adding replica later.

Why no replica? Scan/scroll must retain a multitude of cursors if you use replica. The replica operations are synchronized with the primary operation, they are not linearizable. So they take up extra space in the queue.

You have several options.

  • reduce replica level before doing bulk, and increase replica level after bulk

  • reduce the 20 services to (20/replicalevel)

  • ramp up additional replicalevel * currentnodecount nodes

Thankyou for your quick response.

Yes exactly, I thought scrolls might be causing it, but it doesn't make sense when theres nothing to delete, because it DOES happen even with an empty index. Scroll timeout is 1 minute at the moment. I've been changing it between 10 minutes and 1 minute and I can see no difference whatsoever

I am seeing no logs about merge throttling, just this Rejection exception.

Is there an alternate way of deleting documents without scan and scroll. I know about the Delete by Query API but I heard it has safety issues and is also deprecated.

@am87: Did you find the solution for this problem? It seems that I came across the same issue. Any ideas?

Not a solution really, sort of a work around i'd say.

What I ended up doing was reduce the scan timeout to a very bare minimum which came down to 1 min in my case.

And I added some additional logic to remove the need of doing a full cleanup before a full reindex every single time. This logic basically detects ghost documents and only calls the cleanup activity if they're present.

This resulted in reduced occurrences of scan and scroll and I think the reduced timeout also helped.

Hope this helps :slight_smile: