Hello everyone!
I'm trying to index multiple documents using bulk with c#.
Elastic version 7.5.1
Nest version 7.5.1
First I make a query excluding some fields, this query retun 1937 doc's. The response to the query I'm trying to index with the bulk.
Here is the code used:
var searchResponse = client.Search<Dictionary<string, object>>(s => s
.Index("amazoninf.iica.teste")
.Source(sf => sf
.Excludes(e => e.Fields("_d", "_n"))
)
.Size(10000)
.Query(q => q
.MatchAll()
));
var resultado = searchResponse.Documents;
var bulkIndexResponse = client.Bulk(b => b
.Index("amazoninf.iica.teste.tmp")
.IndexMany(resultado));
And response is:
Invalid NEST response built from a unsuccessful (413) low level call on POST: /amazoninf.iica.teste.tmp/_bulk?pretty=true
# Invalid Bulk items:
# Audit trail of this API call:
- [1] BadResponse: Node: http://win-5jrakspu4gn:9200/ Took: 00:00:06.5634864
# OriginalException: Elasticsearch.Net.ElasticsearchClientException: Request failed to execute. Call: Status code 413 from: POST /amazoninf.iica.teste.tmp/_bulk?pretty=true
# Request:
<Request stream not captured or already read to completion by serializer. Set DisableDirectStreaming() on ConnectionSettings to force it to be set on the response.>
# Response:
<Response stream not captured or already read to completion by serializer. Set DisableDirectStreaming() on ConnectionSettings to force it to be set on the response.>
The request failed to execute and returned a HTTP 413 response, which is 413 Payload Too Large.
So, it looks like the bulk request body is too big for the instance that you're running against to handle.
One way to handle this would be to use BulkAll and specify parameters that would send smaller bulk requests
var bulkAll = client.BulkAll(resultado, b => b
.Index("amazoninf.iica.teste.tmp")
.Size(100) // <-- number of docs to send in each bulk request
.MaxDegreeOfParallelism(4) // <-- number of concurrent bulk requests to send
);
/// alternatively to this, can use BulkAllObserver and subscribe to bulkAll
bulkAll.Wait(TimeSpan.FromMinutes(10), r => {
// do something on each bulk response
});
A better way however would be to use the Reindex API. This way, the documents won't need to be pulled across the wire, only to be sent back over again. Elasticsearch will internally reindex documents into another index.
var sourceIndex = "amazoninf.iica.teste";
var destinationIndex = "amazoninf.iica.teste.tmp";
var getIndexResponse = client.Indices.Get(sourceIndex);
var indexState = getIndexResponse.Indices[sourceIndex];
// create the destination index with the same settings as the source index.
// Won't need to do this is if it already exists
var createIndexResponse = client.Indices.Create(destinationIndex, c => c
.InitializeUsing(indexState)
);
var reindexOnServerResponse = client.ReindexOnServer(r => r
.Source(s => s
.Index(sourceIndex)
.Size(10000)
.Query<object>(q => q
.MatchAll()
)
)
.Destination(d => d
.Index(destinationIndex)
)
.WaitForCompletion(true) // <-- whether the request should wait for the operation to
// complete. Can set to false and use the task id to monitor.
);
Note that the source excludes are not included here- the client only supports a list of fields to include. I'm looking to see if it should support includes/excludes.
Reindex is an interesting option, but I have some processing before indexing.
In the future this processing will be in a plugin inside the elastic and I will use reindex.
Use the Reindex API on the client. In contrast to ReindexOnServer, the Reindex API will pull down documents using an observable ScrollAll helper, create a destination index if need be, and bulk index documents using BulkAll as shown in previous answer. Reindex is a client specific helper function that existed before the reindex API existed in Elasticsearch (which maps to ReindexOnServer). An example Reindex would be something like
var client = new ElasticClient();
var numberOfSlices = 2; // <-- see scroll docs to see what to set this to
// set ILazyDocument type to whichever type makes sense for your data
var reindexObservable = client.Reindex<ILazyDocument>(r => r
.BackPressureFactor(10)
.ScrollAll("1m", numberOfSlices, s => s
.Search(ss => ss
.Index("amazoninf.iica.teste")
.Source(sf => sf
.Excludes(e => e.Fields("_d", "_n"))
)
.Query(q => q
.MatchAll()
)
)
.MaxDegreeOfParallelism(4) // <-- how many concurrent scroll calls
)
.BulkAll(b => b
.Index("amazoninf.iica.teste.tmp")
.Size(100) // <-- number of docs in each bulk call
.MaxDegreeOfParallelism(2) // <-- how many concurrent bulk calls
.RefreshOnCompleted()
)
);
var waitHandle = new ManualResetEvent(false);
Exception exception = null;
// actions to take whilst observing the reindex process
var reindexObserver = new ReindexObserver(
onError: e =>
{
exception = e;
waitHandle.Set();
},
onCompleted: () => waitHandle.Set()
);
// start the observable process
reindexObservable.Subscribe(reindexObserver);
// blocking wait for reindexing process, optionally pass a timeout
waitHandle.WaitOne();
// did the observable end in exception?
if (exception != null)
throw exception;
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.