Hi there, my team is in the process of migrating from Elasticsearch 7 to Elastic.Clients.Elasticsearch 8.16, and I'm having some issues with bulk upsert functionality after refactoring my code.
I initially had the following method that worked well for indexing multiple documents:
public void BulkUpsert(string[] data)
{
if (!data.Any())
{
Console.WriteLine("BulkUpsert array is empty. Continuing process.");
return;
}
var indexResponse = _elastic.Bulk<StringResponse>(PostData.MultiJson(data));
if (indexResponse.OriginalException != null || !indexResponse.Success)
{
Logger.LogError(indexResponse.Body, indexResponse.OriginalException);
Console.WriteLine(indexResponse.Body);
Console.WriteLine(indexResponse.OriginalException?.Message);
}
}
However, after migrating, I refactored it to the following method, which successfully indexes multiple documents, but when trying to perform an update, it appears to create a new document with a new _id
rather than updating the existing one:
public async Task<bool> BulkUpsert(object[] docs)
{
if (docs == null || docs.Length == 0) return false;
var response = await _elastic.BulkAsync(x => x
.Index(_elastic.ElasticsearchClientSettings.DefaultIndex)
.IndexMany(docs)
);
return response.IsValidResponse;
}
The docs
parameter is structured like this:
[0] { Index = {_index = "affiliates" } }
[1] { Uniqueld = "TEST123_FWW_Index", Commission = 10, CommissionFirstMonth = 0, CommissionNetto = 5, CommissionStatus = "approved", ... }
[2] { Update = {_id ="BQiL95MBwRIDSf6N4bOQ", _index = "affiliates" } }
[3] { Uniqueld = "TEST123_FWW_Update", Commission = 10, CommissionFirstMonth = 0, CommissionNetto = 5, CommissionStatus = "approved", ... }
My current Elasticsearch client settings:
var settings = new ElasticsearchClientSettings(new Uri(Constants.Elastic.ProductionApiBaseUrl))
.Authentication(new BasicAuthentication(Constants.Elastic.AffiliateUser, Constants.Elastic.AffiliatePassword))
.DefaultIndex("affiliates")
.DefaultFieldNameInferrer(fieldName => fieldName)
.RequestTimeout(TimeSpan.FromMinutes(Constants.Elastic.RequestTimeoutMinutes))
.EnableHttpCompression();
I'm able to update a single document using this method:
public async Task<bool> UpdateDoc(string index, string id, object doc)
{
var response = await _elastic.UpdateAsync<object, object>(index, id, u => u
.Doc(doc));
return response.IsValidResponse;
}
However, when attempting to handle bulk upserts (with both index and update operations), I tried creating a bulk request but encountered issues:
public async Task<bool> BulkUpsert(object[] docs)
{
if (docs == null || docs.Length == 0) return false;
var bulkRequest = new List<IBulkOperation>();
for (int i = 0; i < docs.Length; i += 2)
{
var operation = docs[i];
var document = docs[i + 1];
if (operation.GetType().GetProperty("Index") != null)
{
var indexOperation = new BulkIndexOperation<object>(document)
{
Index = _elastic.ElasticsearchClientSettings.DefaultIndex,
};
bulkRequest.Add(indexOperation);
}
else if (operation.GetType().GetProperty("Update") != null)
{
var id = GetPropertyValue<string>(operation, "Update", "_id");
var updateOperation = new BulkUpdateOperation<object, object>(id)
{
Index = _elastic.ElasticsearchClientSettings.DefaultIndex,
Id = id,
Doc = document,
DocAsUpsert = true
};
bulkRequest.Add(updateOperation);
}
}
var response = await _elastic.BulkAsync(x => x
.Index(_elastic.ElasticsearchClientSettings.DefaultIndex)
.IndexMany(bulkRequest)
);
return response.IsValidResponse;
}
The index operation creates data with an empty _source
:
{
"_index": "affiliates",
"_id": "JKsd-JMBZyEfzT_DIKnn",
"_version": 1,
"_seq_no": 102620983,
"_primary_term": 78,
"found": true,
"_source": {}
}
The update operation removes the document _source
, and it appears like this:
Before update (in Kibana):
{
"_index": "affiliates",
"_id": "BQiL95MBwRlDSf6N4bOQ",
"_version": 1,
"_seq_no": 102620970,
"_primary_term": 78,
"found": true,
"_source": {
"UniqueId": "TEST123_FWW_John",
"Commission": 10,
"CommissionFirstMonth": 0,
"CommissionNetto": 5,
"CommissionStatus": "approved",
"CommissionType": "type1",
"ValidationDate": "2024-12-24T07:22:28",
"EntryDate": "2024-12-24T07:22:28",
"Issuer": "issuer1",
"TransactionDate": "2024-12-24T07:22:28",
"ClickDate": "2024-12-24T07:22:28",
"ClickRef": "ref123",
"TransactionId": "trans123",
...
}
}
After update (in Kibana):
{
"_index": "affiliates",
"_id": "BQiL95MBwRlDSf6N4bOQ",
"_version": 2,
"_seq_no": 102620984,
"_primary_term": 78,
"found": true,
"_source": {}
}
Any help would be greatly appreciated on:
- Why the update operation is removing the document data (
_source
). - Why the index operation results in an empty document (
_source
). - How to properly handle the bulk upsert logic in Elastic.Clients.Elasticsearch 8.16.
Thanks in advance!