Migrating from Elasticsearch 7 to Elastic.Clients.Elasticsearch 8.16 - Bulk Upsert Issues

Hi there, my team is in the process of migrating from Elasticsearch 7 to Elastic.Clients.Elasticsearch 8.16, and I'm having some issues with bulk upsert functionality after refactoring my code.

I initially had the following method that worked well for indexing multiple documents:

public void BulkUpsert(string[] data)
{
    if (!data.Any())
    {
        Console.WriteLine("BulkUpsert array is empty. Continuing process.");
        return;
    }

    var indexResponse = _elastic.Bulk<StringResponse>(PostData.MultiJson(data));

    if (indexResponse.OriginalException != null || !indexResponse.Success)
    {
        Logger.LogError(indexResponse.Body, indexResponse.OriginalException);
        Console.WriteLine(indexResponse.Body);
        Console.WriteLine(indexResponse.OriginalException?.Message);
    }
}

However, after migrating, I refactored it to the following method, which successfully indexes multiple documents, but when trying to perform an update, it appears to create a new document with a new _id rather than updating the existing one:

public async Task<bool> BulkUpsert(object[] docs)
{
    if (docs == null || docs.Length == 0) return false;

    var response = await _elastic.BulkAsync(x => x
         .Index(_elastic.ElasticsearchClientSettings.DefaultIndex)
         .IndexMany(docs)
     );

    return response.IsValidResponse;
}

The docs parameter is structured like this:

[0] { Index = {_index = "affiliates" } }
[1] { Uniqueld = "TEST123_FWW_Index", Commission = 10, CommissionFirstMonth = 0, CommissionNetto = 5, CommissionStatus = "approved", ... }
[2] { Update = {_id ="BQiL95MBwRIDSf6N4bOQ", _index = "affiliates" } }
[3] { Uniqueld = "TEST123_FWW_Update", Commission = 10, CommissionFirstMonth = 0, CommissionNetto = 5, CommissionStatus = "approved", ... }

My current Elasticsearch client settings:

var settings = new ElasticsearchClientSettings(new Uri(Constants.Elastic.ProductionApiBaseUrl))
    .Authentication(new BasicAuthentication(Constants.Elastic.AffiliateUser, Constants.Elastic.AffiliatePassword))
    .DefaultIndex("affiliates")
    .DefaultFieldNameInferrer(fieldName => fieldName)
    .RequestTimeout(TimeSpan.FromMinutes(Constants.Elastic.RequestTimeoutMinutes))
    .EnableHttpCompression();

I'm able to update a single document using this method:

public async Task<bool> UpdateDoc(string index, string id, object doc)
{
    var response = await _elastic.UpdateAsync<object, object>(index, id, u => u
    .Doc(doc));

    return response.IsValidResponse;
}

However, when attempting to handle bulk upserts (with both index and update operations), I tried creating a bulk request but encountered issues:

public async Task<bool> BulkUpsert(object[] docs)
{
    if (docs == null || docs.Length == 0) return false;

    var bulkRequest = new List<IBulkOperation>();

    for (int i = 0; i < docs.Length; i += 2)
    {
        var operation = docs[i];
        var document = docs[i + 1];

        if (operation.GetType().GetProperty("Index") != null)
        {
            var indexOperation = new BulkIndexOperation<object>(document)
            {
                Index = _elastic.ElasticsearchClientSettings.DefaultIndex,
            };
            bulkRequest.Add(indexOperation);
        }
        else if (operation.GetType().GetProperty("Update") != null)
        {
            var id = GetPropertyValue<string>(operation, "Update", "_id");

            var updateOperation = new BulkUpdateOperation<object, object>(id)
            {
                Index = _elastic.ElasticsearchClientSettings.DefaultIndex,
                Id = id,
                Doc = document,
                DocAsUpsert = true
            };
            bulkRequest.Add(updateOperation);
        }
    }

    var response = await _elastic.BulkAsync(x => x
        .Index(_elastic.ElasticsearchClientSettings.DefaultIndex)
        .IndexMany(bulkRequest)
    );

    return response.IsValidResponse;
}

The index operation creates data with an empty _source:

{
  "_index": "affiliates",
  "_id": "JKsd-JMBZyEfzT_DIKnn",
  "_version": 1,
  "_seq_no": 102620983,
  "_primary_term": 78,
  "found": true,
  "_source": {}
}

The update operation removes the document _source, and it appears like this:

Before update (in Kibana):

{
  "_index": "affiliates",
  "_id": "BQiL95MBwRlDSf6N4bOQ",
  "_version": 1,
  "_seq_no": 102620970,
  "_primary_term": 78,
  "found": true,
  "_source": {
    "UniqueId": "TEST123_FWW_John",
    "Commission": 10,
    "CommissionFirstMonth": 0,
    "CommissionNetto": 5,
    "CommissionStatus": "approved",
    "CommissionType": "type1",
    "ValidationDate": "2024-12-24T07:22:28",
    "EntryDate": "2024-12-24T07:22:28",
    "Issuer": "issuer1",
    "TransactionDate": "2024-12-24T07:22:28",
    "ClickDate": "2024-12-24T07:22:28",
    "ClickRef": "ref123",
    "TransactionId": "trans123",
    ...
  }
}

After update (in Kibana):

{
  "_index": "affiliates",
  "_id": "BQiL95MBwRlDSf6N4bOQ",
  "_version": 2,
  "_seq_no": 102620984,
  "_primary_term": 78,
  "found": true,
  "_source": {}
}

Any help would be greatly appreciated on:

  1. Why the update operation is removing the document data (_source).
  2. Why the index operation results in an empty document (_source).
  3. How to properly handle the bulk upsert logic in Elastic.Clients.Elasticsearch 8.16.

Thanks in advance!