Hello guys, I am using scan and scroll api for ES nest 5.5 to retrieve documents over 1.3 million in size(6 fields/document)
my scroll api works fine if I retrieve less number of documents(around 800000) but when I want to retrieve large documents(1.3 million) it just fails. I dont know what is going wrong. I am trying to do this on C# visual studio and also make a web api and host as a microservice on service fabric app. both places it is failing. on visual studio, I get error: unexpected ES exception, sometimes I get web request aborted and sometimes " search context lost" even though I am giving new scrollid in each loop. below is my code. please help me. its urgent.
ISearchResponse<MyDocument> initialResponse = this.ElasticClient.Search<MyDocument>(scr => scr
.Index(indexName)
.From(0)
.Take(scrollSize)
.MatchAll()
.Scroll(scrollTimeout)
.SearchType(dfs));
List<MyDocument> results = new List<MyDocument>();
if (!initialResponse.IsValid || string.IsNullOrEmpty(initialResponse.ScrollId))
throw new Exception(initialResponse.ServerError.Error.Reason);
if (initialResponse.Documents.Any())
results.AddRange(initialResponse.Documents);
string scrollid = initialResponse.ScrollId;
bool isScrollSetHasData = true;
while (isScrollSetHasData)
{
ISearchResponse<MyDocument> loopingResponse = this.ElasticClient.Scroll<MyDocument>(scrollTimeout, scrollid);
if (loopingResponse.IsValid)
{
results.AddRange(loopingResponse.Documents);
scrollid = loopingResponse.ScrollId;
}
isScrollSetHasData = loopingResponse.Documents.Any();
}
this.ElasticClient.ClearScroll(new ClearScrollRequest(scrollid));
I've formatted your code this time; please use the </> button to format your code in future, or surround in ``` (triple backticks) with optional language after the first block. Trying to read unformatted code makes it much harder to help
Have you seen the ScrollAll() helper method? I think this would fit well with your use case:
var index = "my_index_name";
var numberOfShards = 3;
var seenDocuments = 0;
var documents = new ConcurrentBag<IReadOnlyCollection<MyDocument>>();
client.ScrollAll<MyDocument>("1m", numberOfShards, s => s
.MaxDegreeOfParallelism(numberOfShards / 2)
.Search(search => search
.Index(index)
.AllTypes()
.MatchAll()
)
).Wait(TimeSpan.FromMinutes(5), r =>
{
documents.Add(r.SearchResponse.Documents);
Interlocked.Add(ref seenDocuments, r.SearchResponse.Hits.Count);
});
ScrollAll() can send concurrent scroll requests using slicing, and you can adjust the overall time to wait to fetch all documents with the TimeSpan passed to .Wait(). Using ScrollAll(), you can process documents as they are scrolled.
You can have more control with
var index = "my_index_name";
var numberOfShards = 3;
var seenDocuments = 0;
var documents = new ConcurrentBag<IReadOnlyCollection<MyDocument>>();
var observable = client.ScrollAll<MyDocument>("1m", numberOfShards, s => s
.MaxDegreeOfParallelism(numberOfShards / 2)
.Search(search => search
.Index(index)
.AllTypes()
.MatchAll()
)
);
var waitHandle = new ManualResetEvent(false);
Exception exception = null;
observable.Subscribe(new ScrollAllObserver<MyDocument>(
onNext: r =>
{
documents.Add(r.SearchResponse.Documents);
Interlocked.Add(ref seenDocuments, r.SearchResponse.Hits.Count);
},
onError: e =>
{
exception = e;
waitHandle.Set();
},
onCompleted: () => waitHandle.Set()
));
waitHandle.WaitOne();
if (exception != null)
throw exception;
thanks a lot Russ. this is great. but I dont understand how is it scrolling n batches? is it trying to pull all the data at once? if so, will it not put extra load on the ES cluster?
can you please see the below code which I am trying to execute.? do you think this should work fine for extracting documents over a million in size?
var index = "index name";
var numberOfShards = 5;
var seenDocuments = 0;
var documents = new ConcurrentBag<IReadOnlyCollection<object>>();
client.ScrollAll<object>("5m", numberOfShards, g =>g.MaxDegreeOfParallelism(numberOfShards / 2).Search(
scr => scr.Index(DefaultIndex)
.From(0)
.Take(30000)
.Source(r => r.Includes(i => i.Fields(Fields)))
.Query(q => q
.Nested(n => n
.Path("A_field_name")
.Query(q1 => q1.MatchAll()
)
)
)
.Type("punchouts")
// .SearchType(Elasticsearch.Net.SearchType.DfsQueryThenFetch)
.Scroll("15m"))).Wait(TimeSpan.FromMinutes(5), z =>
{
documents.Add(z.SearchResponse.Documents);
Interlocked.Add(ref seenDocuments, z.SearchResponse.Hits.Count);
}); ;
do you think this will work fine? if not can you please help me figure out an optimum soltion for the same.
Russ I did try both the method and they seem to work on dataset of hundred thousand. one question why does the document store it in array of documents? what is the relevance of number of shard?
Finally, will this work for fetching data over a million documents? (1 doc = 6 fields)
The ScrollAllObservable that ScrollAll() uses takes advantage of Sliced Scrolling within Elasticsearch's Scroll API to slice each scroll request into a number of slices that can be processed concurrently.
No, it is still issuing scroll requests to scroll through all of the documents that satisfy the query. Within the onNext delegate of the ScrollAllObserver<T>, you can determine what to do with the documents returned from each scroll response. To reduce memory footprint, you may be able to process documents as they are returned, in this delegate.
No more load than issuing scroll requests using the Scroll API.
"should work fine" is very subjective This will scroll through a million documents, and if you need to retrieve that many documents, the scroll API is a good candidate for doing this.
oops you tricked me. which one is better? ScrollAll or Scroll?
**Scroll => (which I originally used)**
ISearchResponse<MyDocument> initialResponse = this.ElasticClient.Search<MyDocument>(scr => scr
.Index(indexName)
.From(0)
.Take(scrollSize)
.MatchAll()
.Scroll(scrollTimeout)
.SearchType(dfs));
List<MyDocument> results = new List<MyDocument>();
if (!initialResponse.IsValid || string.IsNullOrEmpty(initialResponse.ScrollId))
throw new Exception(initialResponse.ServerError.Error.Reason);
if (initialResponse.Documents.Any())
results.AddRange(initialResponse.Documents);
string scrollid = initialResponse.ScrollId;
bool isScrollSetHasData = true;
while (isScrollSetHasData)
{
ISearchResponse<MyDocument> loopingResponse = this.ElasticClient.Scroll<MyDocument>(scrollTimeout, scrollid);
if (loopingResponse.IsValid)
{
results.AddRange(loopingResponse.Documents);
scrollid = loopingResponse.ScrollId;
}
isScrollSetHasData = loopingResponse.Documents.Any();
}
this.ElasticClient.ClearScroll(new ClearScrollRequest(scrollid));
or this one ? ScrollAll => which you suggested
var index = "my_index_name";
var numberOfShards = 3;
var seenDocuments = 0;
var documents = new ConcurrentBag<IReadOnlyCollection<MyDocument>>();
client.ScrollAll<MyDocument>("1m", numberOfShards, s => s
.MaxDegreeOfParallelism(numberOfShards / 2)
.Search(search => search
.Index(index)
.AllTypes()
.MatchAll()
)
).Wait(TimeSpan.FromMinutes(5), r =>
{
documents.Add(r.SearchResponse.Documents);
Interlocked.Add(ref seenDocuments, r.SearchResponse.Hits.Count);
});
I'd suggest trying both approaches and seeing which one works better for you. I would expect ScrollAll to perform better since it performs concurrent scrolling. ScrollAll is provided as a helper with NEST so that you don't need to write a concurrent scroll implementation yourself
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.