I've formatted your code this time; please use the </>
button to format your code in future, or surround in ``` (triple backticks) with optional language after the first block. Trying to read unformatted code makes it much harder to help
Have you seen the ScrollAll()
helper method? I think this would fit well with your use case:
var index = "my_index_name";
var numberOfShards = 3;
var seenDocuments = 0;
var documents = new ConcurrentBag<IReadOnlyCollection<MyDocument>>();
client.ScrollAll<MyDocument>("1m", numberOfShards, s => s
.MaxDegreeOfParallelism(numberOfShards / 2)
.Search(search => search
.Index(index)
.AllTypes()
.MatchAll()
)
).Wait(TimeSpan.FromMinutes(5), r =>
{
documents.Add(r.SearchResponse.Documents);
Interlocked.Add(ref seenDocuments, r.SearchResponse.Hits.Count);
});
ScrollAll()
can send concurrent scroll requests using slicing, and you can adjust the overall time to wait to fetch all documents with the TimeSpan
passed to .Wait()
. Using ScrollAll()
, you can process documents as they are scrolled.
You can have more control with
var index = "my_index_name";
var numberOfShards = 3;
var seenDocuments = 0;
var documents = new ConcurrentBag<IReadOnlyCollection<MyDocument>>();
var observable = client.ScrollAll<MyDocument>("1m", numberOfShards, s => s
.MaxDegreeOfParallelism(numberOfShards / 2)
.Search(search => search
.Index(index)
.AllTypes()
.MatchAll()
)
);
var waitHandle = new ManualResetEvent(false);
Exception exception = null;
observable.Subscribe(new ScrollAllObserver<MyDocument>(
onNext: r =>
{
documents.Add(r.SearchResponse.Documents);
Interlocked.Add(ref seenDocuments, r.SearchResponse.Hits.Count);
},
onError: e =>
{
exception = e;
waitHandle.Set();
},
onCompleted: () => waitHandle.Set()
));
waitHandle.WaitOne();
if (exception != null)
throw exception;