I am using .NET 8 and ES Version="8.15.6". I an trying to use bulk api with a logic for upsert based on the document id. I tried the below index, update commands from Kibana as per documentation and they work fine
when I run the POST _bulk directly from Kibana and do a search I see just the documents I inserted.
Does anyone has any reference around how to use for bulk apis in .NET with capability of differentiating insert and update ?
I'm not a c# dev, but it looks strange to me that you are getting the document by elasticsearchId and then indexing using documentId.
But I'm wondering why you want to do a difference between the first insert (when the document doesn't exist) and the second operation. As you are sending in both cases the same full document AFAICS.
Thanks @dadoonet for the reply. This is what I am trying to do and I am not able to figure out how to achieve it using ES bulk API.
I am doing below when using the single API but since I have 300K+ documents I want to use the _bulk API with appropriate batch size. Basically search for the documentId, if present update it else insert. Right now its not a partial update I understand with the updated fields alone since that is something can't be found out easily. But my requirement is if a document by the ID already exists I should not end up inserting it again but update it.
string documentId = (string)documentType.GetProperty("Id")?.GetValue(document);
if (!string.IsNullOrEmpty(documentId))
{
var elasticsearchId = new Id(documentId);
var existsResponse = await _elasticsearchClient.GetAsync<object>(
elasticsearchId,
idx => idx.Index(indexName));
// Upsert logic based on Id
if (existsResponse.Found) // If the document exists, update it
{
var updateResponse = await UpdateDocumentAsync(elasticsearchId, indexName, document);
if (updateResponse.IsValidResponse)
{
documentIndexed = true; // Successfully updated
indexedCount++; // Increment count for successful updates
}
else
{
_logger.LogError("Failed to update document from {JsonFile}: Error: {DebugInformation}. Retrying...", jsonFile, updateResponse.DebugInformation);
}
}
else // If the document does not exist, insert it
{
var indexResponse = await IndexDocumentAsync(document, indexName, documentId);
if (indexResponse.IsValidResponse)
{
documentIndexed = true; // Successfully indexed
indexedCount++; // Increment count for successful indexing
}
else
{
_logger.LogError("Failed to index document from {JsonFile}: Error: {DebugInformation}. Retrying...", jsonFile, indexResponse.DebugInformation);
}
}
Why do you want to update instead of inserting again? Behind the scene you will end up reindexing again the document. So in 99% of the use cases, calling update is useless.
Thanks @dadoonet
So this is the issue. Now I removed the if-else and just doing an Index(no update), by deleting the index from Elastic(so no prev data is available)
string documentId = ((ModelRecord)document).Id; //ModelRecord is the POCO which has Id field and the unique identifier
var elasticsearchId = new Id(documentId);
bulkIndexOperations.Add(new { Index = new { _index = indexName, _id = elasticsearchId } });
bulkIndexOperations.Add(document);
if (bulkIndexOperations.Count >= batchSize)
{
var indexedCount = await ExecuteBulkIndexingAsync(bulkIndexOperations, indexName);
bulkIndexOperations.Clear(); // Clear the list for the next batch
}
Inside ExecuteBulkIndexingAsync() I call the bulk API
var bulkResponse = await _elasticsearchClient.BulkAsync(b => b
.Index(indexName)
.IndexMany(bulkIndexOperations));
I still see double the doc count since I have something like below in Elastic
It looks like the id field is used for the document id.
This document have specified _id and not id like in the other document. It looks like this may be the index record of the bulk format, meaning that you are not constructing the bulk request correctly. I am not a .NET developer so will not be able to help with the client usage.
Thanks @Christian_Dahlqvist
Yes I tried to google but did not find the latest C# .NET usage on how to make the bulk API call in the correct format. But I am sending only _id from my code as you can see below
string documentId = ((ModelRecord)document).Id; //ModelRecord is the POCO which has Id field and the unique identifier
var elasticsearchId = new Id(documentId);
bulkIndexOperations.Add(new { Index = new { _index = indexName, _id = elasticsearchId } });
bulkIndexOperations.Add(document);
Based on my understanding I formed the Add. I did not find any sample for .NET in documentation on its usage.
If I remove the below line bulk API fails bulkIndexOperations.Add(new { Index = new { _index = indexName, _id = documentId } });
where I thought when its Index its and insert and its like below its an update operation
Here bulkIndexOperations is just a List of object(List bulkIndexOperations) so I don't know if there is a fixed format. I tried to have something to match below
_id is NOT mandatory ES will create one if not sent.
This is my understanding and I did not get any reference in the internet
// First add the metadata for the bulk operation (the "header")
bulkIndexOperations.Add(new
{
Index = new
{
_index = indexName,
_id = documentId
}
});
// Then add the document itself (the "body")
bulkIndexOperations.Add(document);
So, the first line imo ({ "index": { "_index": "MyIndex", "_id": "2018-17136" } }) is the metadata, which specifies that the operation is an index operation and the target index and document ID.
The second line ({ "id": "2018-17136", "fieldA": true, "fieldB": false }) is the actual document, which contains the fields to be indexed.
Based on what you see in Elasticsearch it seems clear that the client handles these two commands as separate documents and not two lines of the bulk request.
Have you looked at the tests available within the client repo to see how it is used?
For now I am continuing with below code but I dont know how to pass the operation type - as to its Insert of Update
string documentId = ((MyRecord)document).Id;
//Check for duplicates in the bulk operations first
bool alreadyExistsInBulk = bulkIndexOperations.Any(op =>
{
if (op is MyRecord record)
{
return record.Id == documentId; // Check if the document ID is already in the operations list
}
return false;
});
if (!alreadyExistsInBulk)
{
bulkIndexOperations.Add(document);
}
if (bulkIndexOperations.Count >= batchSize)
{
var bulkResponse = await _elasticsearchClient.BulkAsync(b => b
.Index(indexName)
.IndexMany(bulkIndexOperations));
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.