Elasticsearch BULK API with .NET for upsert

Hi Team,

I am using .NET 8 and ES Version="8.15.6". I an trying to use bulk api with a logic for upsert based on the document id. I tried the below index, update commands from Kibana as per documentation and they work fine

POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

However when I try to do the same from my C# .NET code I am seeing a weird behavior. I see 2 docs indexed, 1 like below

{
        "_index": "MyIndex",
        "_id": "42r-9pIBQ0r3zUFWk2vX",
        "_score": 1,
        "_source": {
          "update": {
            "_index": "MyIndex",
            "_id": "1234"
          }
        }
      }

and other document with actual data inside the source. So if I have like 10 docs to be indexed what I see in elastic is 20 docs getting indexed

{
        "_index": "MyIndex",
        "_id": "5Gr-9pIBQ0r3zUFWk2vX",
        "_score": 1,
        "_source": {
          "doc": {
                "id": "1234",
                //followed by rest of the fields
         }}
}

Below is the code

List<object> bulkIndexOperations
// Check if the document exists in Elasticsearch
      var existsResponse = await _elasticsearchClient.GetAsync<object>(elasticsearchId, idx => idx.Index(indexName));

      if (existsResponse.Found)
      {
        // Prepare an update operation - index operation followed by source
        bulkIndexOperations.Add(new { Update = new { _index = indexName, _id = documentId } });
        bulkIndexOperations.Add(new { doc = document });
      }
      else
      {
        // Prepare an index operation
        bulkIndexOperations.Add(new { Index = new { _index = indexName, _id = documentId } });
        bulkIndexOperations.Add(document);
      }
var bulkResponse = await _elasticsearchClient.BulkAsync(b => b
                    .Index(indexName)
                    .IndexMany(bulkIndexOperations));

Any idea why that weird extra document is getting indexed which has no source?

Thanks,
Moni

when I run the POST _bulk directly from Kibana and do a search I see just the documents I inserted.
Does anyone has any reference around how to use for bulk apis in .NET with capability of differentiating insert and update ?

I'm not a c# dev, but it looks strange to me that you are getting the document by elasticsearchId and then indexing using documentId.

But I'm wondering why you want to do a difference between the first insert (when the document doesn't exist) and the second operation. As you are sending in both cases the same full document AFAICS.

Thanks @dadoonet for the reply. This is what I am trying to do and I am not able to figure out how to achieve it using ES bulk API.
I am doing below when using the single API but since I have 300K+ documents I want to use the _bulk API with appropriate batch size. Basically search for the documentId, if present update it else insert. Right now its not a partial update I understand with the updated fields alone since that is something can't be found out easily. But my requirement is if a document by the ID already exists I should not end up inserting it again but update it.

string documentId = (string)documentType.GetProperty("Id")?.GetValue(document);
                        if (!string.IsNullOrEmpty(documentId))
                        {
                            var elasticsearchId = new Id(documentId);
                            var existsResponse = await _elasticsearchClient.GetAsync<object>(
                                        elasticsearchId,
                                        idx => idx.Index(indexName));

                                    // Upsert logic based on Id
                                    if (existsResponse.Found) // If the document exists, update it
                                    {
                                        var updateResponse = await UpdateDocumentAsync(elasticsearchId, indexName, document);
                                        if (updateResponse.IsValidResponse)
                                        {
                                            documentIndexed = true; // Successfully updated
                                            indexedCount++; // Increment count for successful updates
                                        }
                                        else
                                        {
                                            _logger.LogError("Failed to update document from {JsonFile}: Error: {DebugInformation}. Retrying...", jsonFile, updateResponse.DebugInformation);
                                        }
                                    }
                                    else // If the document does not exist, insert it
                                    {
                                        var indexResponse = await IndexDocumentAsync(document, indexName, documentId);
                                        if (indexResponse.IsValidResponse)
                                        {
                                            documentIndexed = true; // Successfully indexed
                                            indexedCount++; // Increment count for successful indexing
                                        }
                                        else
                                        {
                                            _logger.LogError("Failed to index document from {JsonFile}: Error: {DebugInformation}. Retrying...", jsonFile, indexResponse.DebugInformation);
                                        }
                                    }