Elasticsearch BULK API with .NET for upsert

Hi Team,

I am using .NET 8 and ES Version="8.15.6". I an trying to use bulk api with a logic for upsert based on the document id. I tried the below index, update commands from Kibana as per documentation and they work fine

POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

However when I try to do the same from my C# .NET code I am seeing a weird behavior. I see 2 docs indexed, 1 like below

{
        "_index": "MyIndex",
        "_id": "42r-9pIBQ0r3zUFWk2vX",
        "_score": 1,
        "_source": {
          "update": {
            "_index": "MyIndex",
            "_id": "1234"
          }
        }
      }

and other document with actual data inside the source. So if I have like 10 docs to be indexed what I see in elastic is 20 docs getting indexed

{
        "_index": "MyIndex",
        "_id": "5Gr-9pIBQ0r3zUFWk2vX",
        "_score": 1,
        "_source": {
          "doc": {
                "id": "1234",
                //followed by rest of the fields
         }}
}

Below is the code

List<object> bulkIndexOperations
// Check if the document exists in Elasticsearch
      var existsResponse = await _elasticsearchClient.GetAsync<object>(elasticsearchId, idx => idx.Index(indexName));

      if (existsResponse.Found)
      {
        // Prepare an update operation - index operation followed by source
        bulkIndexOperations.Add(new { Update = new { _index = indexName, _id = documentId } });
        bulkIndexOperations.Add(new { doc = document });
      }
      else
      {
        // Prepare an index operation
        bulkIndexOperations.Add(new { Index = new { _index = indexName, _id = documentId } });
        bulkIndexOperations.Add(document);
      }
var bulkResponse = await _elasticsearchClient.BulkAsync(b => b
                    .Index(indexName)
                    .IndexMany(bulkIndexOperations));

Any idea why that weird extra document is getting indexed which has no source?

Thanks,
Moni

when I run the POST _bulk directly from Kibana and do a search I see just the documents I inserted.
Does anyone has any reference around how to use for bulk apis in .NET with capability of differentiating insert and update ?

I'm not a c# dev, but it looks strange to me that you are getting the document by elasticsearchId and then indexing using documentId.

But I'm wondering why you want to do a difference between the first insert (when the document doesn't exist) and the second operation. As you are sending in both cases the same full document AFAICS.

Thanks @dadoonet for the reply. This is what I am trying to do and I am not able to figure out how to achieve it using ES bulk API.
I am doing below when using the single API but since I have 300K+ documents I want to use the _bulk API with appropriate batch size. Basically search for the documentId, if present update it else insert. Right now its not a partial update I understand with the updated fields alone since that is something can't be found out easily. But my requirement is if a document by the ID already exists I should not end up inserting it again but update it.

string documentId = (string)documentType.GetProperty("Id")?.GetValue(document);
                        if (!string.IsNullOrEmpty(documentId))
                        {
                            var elasticsearchId = new Id(documentId);
                            var existsResponse = await _elasticsearchClient.GetAsync<object>(
                                        elasticsearchId,
                                        idx => idx.Index(indexName));

                                    // Upsert logic based on Id
                                    if (existsResponse.Found) // If the document exists, update it
                                    {
                                        var updateResponse = await UpdateDocumentAsync(elasticsearchId, indexName, document);
                                        if (updateResponse.IsValidResponse)
                                        {
                                            documentIndexed = true; // Successfully updated
                                            indexedCount++; // Increment count for successful updates
                                        }
                                        else
                                        {
                                            _logger.LogError("Failed to update document from {JsonFile}: Error: {DebugInformation}. Retrying...", jsonFile, updateResponse.DebugInformation);
                                        }
                                    }
                                    else // If the document does not exist, insert it
                                    {
                                        var indexResponse = await IndexDocumentAsync(document, indexName, documentId);
                                        if (indexResponse.IsValidResponse)
                                        {
                                            documentIndexed = true; // Successfully indexed
                                            indexedCount++; // Increment count for successful indexing
                                        }
                                        else
                                        {
                                            _logger.LogError("Failed to index document from {JsonFile}: Error: {DebugInformation}. Retrying...", jsonFile, indexResponse.DebugInformation);
                                        }
                                    }

Why do you want to update instead of inserting again? Behind the scene you will end up reindexing again the document. So in 99% of the use cases, calling update is useless.

1 Like

ok lets say we forget about the upsert requirement here and I do a bulk API call for insert only, which is like the else below

else
      {
        // Prepare an index operation
        bulkIndexOperations.Add(new { Index = new { _index = indexName, _id = elasticsearchId } });
        bulkIndexOperations.Add(document);
      }

But like I mentioned my issue is why am I see 2 docs in elastic vs 1 for the same id?

Thanks @dadoonet
So this is the issue. Now I removed the if-else and just doing an Index(no update), by deleting the index from Elastic(so no prev data is available)

string documentId = ((ModelRecord)document).Id; //ModelRecord is the POCO which has Id field and the unique identifier 
var elasticsearchId = new Id(documentId);
bulkIndexOperations.Add(new { Index = new { _index = indexName, _id = elasticsearchId } });
bulkIndexOperations.Add(document);
if (bulkIndexOperations.Count >= batchSize)
    {
      var indexedCount = await ExecuteBulkIndexingAsync(bulkIndexOperations, indexName);
      bulkIndexOperations.Clear(); // Clear the list for the next batch
    }

Inside ExecuteBulkIndexingAsync() I call the bulk API

var bulkResponse = await _elasticsearchClient.BulkAsync(b => b
                    .Index(indexName)
                    .IndexMany(bulkIndexOperations));

I still see double the doc count since I have something like below in Elastic

"hits": {
    "total": {
      "value": 168630,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "MyIndex",
        "_id": "TJ5N-5IBk9raR26I2YG_",
        "_score": 1,
        "_source": {
          "index": {
            "_index": "MyIndex",
            "_id": "2018-17136"
          }
        }
      },
      {
        "_index": "MyIndex",
        "_id": "2018-17136",
        "_score": 1,
        "_source": { //the actual fields from the ModelRecord POCO
          "id": "2018-17136",
          "fieldA": true,
          "fieldB": false
        }
      }

It looks like the id field is used for the document id.

This document have specified _id and not id like in the other document. It looks like this may be the index record of the bulk format, meaning that you are not constructing the bulk request correctly. I am not a .NET developer so will not be able to help with the client usage.

Thanks @Christian_Dahlqvist
Yes I tried to google but did not find the latest C# .NET usage on how to make the bulk API call in the correct format. But I am sending only _id from my code as you can see below

string documentId = ((ModelRecord)document).Id; //ModelRecord is the POCO which has Id field and the unique identifier 
var elasticsearchId = new Id(documentId);
bulkIndexOperations.Add(new { Index = new { _index = indexName, _id = elasticsearchId } });
bulkIndexOperations.Add(document);

To me it looks like each of the add method calls add a new document and not a separate line to the bulk format.

{ Index = new { _index = indexName, _id = elasticsearchId } }

seems to correspond to

"_source": {
  "index": {
    "_index": "MyIndex",
    "_id": "2018-17136"
  }
}

Does the add method have any other parameters, e.g. index name, operation and document id?

1 Like

I agree. In Java, you call only once the add method with both the header (_id, _index) and the body (document).

1 Like

Based on my understanding I formed the Add. I did not find any sample for .NET in documentation on its usage.
If I remove the below line bulk API fails
bulkIndexOperations.Add(new { Index = new { _index = indexName, _id = documentId } });
where I thought when its Index its and insert and its like below its an update operation

bulkIndexOperations.Add(new { Update = new { _index = indexName, _id = documentId } });
bulkIndexOperations.Add(new { doc = document });

Here bulkIndexOperations is just a List of object(List bulkIndexOperations) so I don't know if there is a fixed format. I tried to have something to match below
_id is NOT mandatory ES will create one if not sent.

POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }

I am already passing indexName and documentId right? what am I missing?

If I google .NET usage it takes me to this page - Multiple documents with BulkAllObservable helper | Elasticsearch .NET Client [8.15] | Elastic
Indexing documents | Elasticsearch .NET Client [8.15] | Elastic
but says - This page has been deleted.

This is my understanding and I did not get any reference in the internet

// First add the metadata for the bulk operation (the "header")
    bulkIndexOperations.Add(new 
    { 
        Index = new 
        { 
            _index = indexName, 
            _id = documentId 
        } 
    });

    // Then add the document itself (the "body")
    bulkIndexOperations.Add(document);

So, the first line imo ({ "index": { "_index": "MyIndex", "_id": "2018-17136" } }) is the metadata, which specifies that the operation is an index operation and the target index and document ID.
The second line ({ "id": "2018-17136", "fieldA": true, "fieldB": false }) is the actual document, which contains the fields to be indexed.

Based on what you see in Elasticsearch it seems clear that the client handles these two commands as separate documents and not two lines of the bulk request.

Have you looked at the tests available within the client repo to see how it is used?

Hi @Christian_Dahlqvist I looked at the github repo and felt this is relevant elasticsearch-net/tests/Tests/Document/Multiple/Bulk/BulkApiTests.cs at main · elastic/elasticsearch-net · GitHub
But did not completely get how exactly to form the correct bulk request.

For now I am continuing with below code but I dont know how to pass the operation type - as to its Insert of Update

string documentId = ((MyRecord)document).Id;
    //Check for duplicates in the bulk operations first
    bool alreadyExistsInBulk = bulkIndexOperations.Any(op =>
    {
      if (op is MyRecord record)
      {
        return record.Id == documentId; // Check if the document ID is already in the operations list
      }
      return false;
    });

    if (!alreadyExistsInBulk)
    {
      bulkIndexOperations.Add(document);
    }
if (bulkIndexOperations.Count >= batchSize)
{
   var bulkResponse = await _elasticsearchClient.BulkAsync(b => b
                    .Index(indexName)
                    .IndexMany(bulkIndexOperations));
}

I am not a .NET developer and have therefore never used that client so will unfortunately not be able to help with this.