Using bulk index

Hello everyone!
I'm trying to index multiple documents using bulk with c#.

Elastic version 7.5.1
Nest version 7.5.1

First I make a query excluding some fields, this query retun 1937 doc's. The response to the query I'm trying to index with the bulk.

Here is the code used:

    var searchResponse = client.Search<Dictionary<string, object>>(s => s
            .Index("amazoninf.iica.teste")
            .Source(sf => sf
                .Excludes(e => e.Fields("_d", "_n"))
                )
            .Size(10000)
            .Query(q => q
            .MatchAll()

            ));

    var resultado = searchResponse.Documents;

    var bulkIndexResponse = client.Bulk(b => b
        .Index("amazoninf.iica.teste.tmp")
        .IndexMany(resultado));

And response is:

Invalid NEST response built from a unsuccessful (413) low level call on POST: /amazoninf.iica.teste.tmp/_bulk?pretty=true
# Invalid Bulk items:
# Audit trail of this API call:
 - [1] BadResponse: Node: http://win-5jrakspu4gn:9200/ Took: 00:00:06.5634864
# OriginalException: Elasticsearch.Net.ElasticsearchClientException: Request failed to execute. Call: Status code 413 from: POST /amazoninf.iica.teste.tmp/_bulk?pretty=true
# Request:
<Request stream not captured or already read to completion by serializer. Set DisableDirectStreaming() on ConnectionSettings to force it to be set on the response.>
# Response:
<Response stream not captured or already read to completion by serializer. Set DisableDirectStreaming() on ConnectionSettings to force it to be set on the response.>

Regards,

Alder

The request failed to execute and returned a HTTP 413 response, which is 413 Payload Too Large.

So, it looks like the bulk request body is too big for the instance that you're running against to handle.

One way to handle this would be to use BulkAll and specify parameters that would send smaller bulk requests

var bulkAll = client.BulkAll(resultado, b => b
	.Index("amazoninf.iica.teste.tmp")
	.Size(100) // <-- number of docs to send in each bulk request
	.MaxDegreeOfParallelism(4) // <-- number of concurrent bulk requests to send
);

/// alternatively to this, can use BulkAllObserver and subscribe to bulkAll
bulkAll.Wait(TimeSpan.FromMinutes(10), r => {
	// do something on each bulk response
});

A better way however would be to use the Reindex API. This way, the documents won't need to be pulled across the wire, only to be sent back over again. Elasticsearch will internally reindex documents into another index.

var sourceIndex = "amazoninf.iica.teste";
var destinationIndex = "amazoninf.iica.teste.tmp";
var getIndexResponse = client.Indices.Get(sourceIndex);
var indexState = getIndexResponse.Indices[sourceIndex];

// create the destination index with the same settings as the source index.
// Won't need to do this is if it already exists
var createIndexResponse = client.Indices.Create(destinationIndex, c => c
	.InitializeUsing(indexState)
);

var reindexOnServerResponse = client.ReindexOnServer(r => r
	.Source(s => s
		.Index(sourceIndex)
		.Size(10000)
		.Query<object>(q => q
			.MatchAll()
		)
	)
	.Destination(d => d
		.Index(destinationIndex)
	)
	.WaitForCompletion(true) // <-- whether the request should wait for the operation to 
                             // complete. Can set to false and use the task id to monitor.
);

Note that the source excludes are not included here- the client only supports a list of fields to include. I'm looking to see if it should support includes/excludes.

Thank you very very much

Reindex is an interesting option, but I have some processing before indexing.
In the future this processing will be in a plugin inside the elastic and I will use reindex.

Thanks again,

Alder

If you need to perform some processing on the docs, you may want to take a look at a couple of options:

  1. It may be that the processing you need to do can be achieved in an ingest node processor pipeline. With this approach, you define a processor pipeline and then add it as the default pipeline for the destination index, or specify the pipeline as part of the reindex operation. The .NET client documentation has an example of creating a pipeline and using it in a bulk API call.

or

  1. Use the Reindex API on the client. In contrast to ReindexOnServer, the Reindex API will pull down documents using an observable ScrollAll helper, create a destination index if need be, and bulk index documents using BulkAll as shown in previous answer. Reindex is a client specific helper function that existed before the reindex API existed in Elasticsearch (which maps to ReindexOnServer). An example Reindex would be something like
var client = new ElasticClient();
var numberOfSlices = 2; // <-- see scroll docs to see what to set this to

// set ILazyDocument type to whichever type makes sense for your data
var reindexObservable = client.Reindex<ILazyDocument>(r => r
	.BackPressureFactor(10)
	.ScrollAll("1m", numberOfSlices, s => s
		.Search(ss => ss
			.Index("amazoninf.iica.teste")
			.Source(sf => sf
				.Excludes(e => e.Fields("_d", "_n"))
			)
			.Query(q => q
				.MatchAll()
			)
		)
		.MaxDegreeOfParallelism(4) // <-- how many concurrent scroll calls
	)
	.BulkAll(b => b
		.Index("amazoninf.iica.teste.tmp")
		.Size(100) // <-- number of docs in each bulk call
		.MaxDegreeOfParallelism(2) // <-- how many concurrent bulk calls
		.RefreshOnCompleted()
	)
);

var waitHandle = new ManualResetEvent(false);
Exception exception = null;

// actions to take whilst observing the reindex process
var reindexObserver = new ReindexObserver(
	onError: e =>
	{
		exception = e;
		waitHandle.Set();
	},
	onCompleted: () => waitHandle.Set()
);

// start the observable process
reindexObservable.Subscribe(reindexObserver);

// blocking wait for reindexing process, optionally pass a timeout
waitHandle.WaitOne();

// did the observable end in exception?
if (exception != null)
	throw exception;

Thank you very much Ross.
Very precious information

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.