Hey guys,
I'm having some issues with bulk indexing and scan and scroll.
Some background:
I have an Elasticsearch index with around 80 million documents.
My bulk indexing setup consists of 20 "processor" services, each of which load data from a mysql database and fire bulk index requests to a 2 node Elasticsearch cluster.
Each of these services uses a BulkProcessor with:
- a bulk size of 10MB
- concurrent requests set to 2
- bulk actions set to -1
So far this gave me the bulk indexing throughput I needed. With a 2 node cluster(16GB each) ~80 million documents took around 20 mins.
Each document I'm storing in ES is associated with a specific user accountId of my web application. So I have an accountId field in each document. I don't have a separate index for each user because then I end up with 2000+ indexes and types which leads to cluster state issues.
Ok so far everything is good. I have the capability of reindexing each users documents independently. Recently I added an extra step to my processor service logic, and this step was to delete all existing documents in Elasticsearch for that user. So the processor logic now looks like:
- Grab userId from pending list
- Delete all existing documents in ES where account_id = "userId" using scan and scroll + bulk delete calls IF any documents exist.
- Reindex everything.
Now if I have 2000 users, and I kick off the reindex process, these 2000 users' documents will be processed in parallel by 20 services each of which is using the above logic.
When I do this I get
EsRejectedExecutionException[rejected execution (queue capacity 100) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction$1@117b1fdc]
According to my understanding, this exception means that there are way too many concurrent requests for bulk indexing. But WHY has this started to happen after I added the "scan and scroll + bulk delete step"
Also, If I delete my entire index and start reindexing from scratch I still run into the same issue. If there is no index, there are no documents and therefore the scan and scroll shouldn't find anything anyway so there should be no bulk delete requests, but I still run into the same problem.
If I remove that step, everything goes back to normal.
So my question is:
Is there an inherent overhead in using a scan and scroll query which causes it to use up the thread.bulk.queue?
One solution could be to just increate the thread.bulk.queue.size to 300 or 500 or whatever, but I really want to understand this.
Please let me know if you need more information, any help would be very very much appreciated!
Note: I know that I don't need to delete everything from an index to reindex i.e reindex will override the existing documents anyway. The reason I need to explicitly delete everything for a user first is because sometimes my ES dataset ends up having more documents than I have in mysql for that customers. This is just to cleanup up any ghost documents which were not deleted via the deletion event, its a long story, I can elaborate if needed