I'm new to web development and working with Elastic, and I'm currently migrating a project from Elastic NEST v7 to the .NET Elastic Client v8. I've encountered an issue with bulk indexing in one of the projects.
What works
The following method works as expected and I can see the documents have been indexed when I check the Elastic Cloud:
public static async Task Flush()
{
if (Instance._messageList.Count == 0)
return;
try
{
List<object> messageList;
lock (_padlock)
{
if (Instance._messageList.Count == 0)
return;
messageList = new List<object>(Instance._messageList);
Instance._messageList.Clear();
Instance._lastFlush = DateTime.Now;
}
if (messageList.Count == 0)
return;
var bulkResponse = await Instance._client.BulkAsync(x => x
.Index(_elasticIndex)
.IndexMany(messageList));
if (bulkResponse.Errors)
{
foreach (var item in bulkResponse.ItemsWithErrors)
{
Logger.LogError(item.Error?.Reason, null, true);
}
}
}
catch (Exception ex)
{
Debug.WriteLine(ex);
Logger.LogError(ex);
}
}
The messageList payload:
[0] { index = { _index = "working_index" } }
[1] { version = "1.0", short_message = "test (flushed)", full_message = "", host = "HOSTNAME", applicationname = "***", logic_facility = "***", level = 7, category = "", timestamp = "2024-12-12T15:49:46.000000" }
The response:
'Valid Elasticsearch response built from a successful (200) low level call on POST: /lworking_index/_bulk?pretty=true&error_trace=true
# Audit trail of this API call:
- [1] HealthyResponse: Node: https://b7da72d8bd20404fb73eb8f3eca72424.europe-west1.gcp.cloud.es.io:9243/ Took: 00:00:00.6847335
# Request:
{"index":{}}
{"index":{"_index":"working_index"}}
{"index":{}}
{"version":"1.0","short_message":"\r\ntest (flushed)","full_message":"","host":"HOSTNAME","applicationname":"***","logic_facility":"***","level":7,"category":"","timestamp":"2024-12-12T17:27:20.000000"}
# Response:
{
"errors" : false,
"took" : 0,
"items" : [
{
"index" : {
"_index" : "working_index",
"_id" : "uaDpu5MBZyEfzT_DmF1O",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 1806437,
"_primary_term" : 31,
"status" : 201
}
},
{
"index" : {
"_index" : "working_index",
"_id" : "uqDpu5MBZyEfzT_DmF1O",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 1806438,
"_primary_term" : 31,
"status" : 201
}
}
]
}
# TCP states:
Established: 30
TimeWait: 9
# ThreadPool statistics:
Worker:
Busy: 2
Free: 32765
Min: 16
Max: 32767
IOCP:
Busy: 0
Free: 1000
Min: 1
Max: 1000'
My elastic client settings:
var settings = new ElasticsearchClientSettings(new Uri(logSettings.LoggingUri))
.DefaultIndex(_elasticIndex)
.RequestTimeout(TimeSpan.FromMinutes(5d))
.EnableHttpCompression()
.Authentication(new BasicAuthentication("working_index", "xxx"))
.EnableDebugMode();
What doesn't work
The following method does not work as expected. It returns a successfully however, I can not see the indexed documents when I check the Elastic Cloud:
public async Task<bool> BulkIndex(object[] docs)
{
if (docs == null || docs.Length == 0) return false;
var response = await _elastic.BulkAsync(x => x
.Index(_elastic.ElasticsearchClientSettings.DefaultIndex)
.IndexMany(docs)
);
return response.IsValidResponse;
}
The docs payload:
[0] { index = { _index = "broken_index" } }
[1] { TransactionDate = {12/12/2024 18:41:59}, ClickDate = {12/12/2024 18:41:59} }
The response:
Valid Elasticsearch response built from a successful (200) low level call on POST: /broken_index/_bulk?pretty=true&error_trace=true
# Audit trail of this API call:
- [1] HealthyResponse: Node: https://b7da72d8bd20404fb73eb8f3eca72424.europe-west1.gcp.cloud.es.io:9243/ Took: 00:00:01.1513287
# Request:
{"index":{}}
{"index":{"_index":"broken_index"}}
{"index":{}}
{"transactionDate":"2024-12-12T18:41:59.1448351+01:00","clickDate":"2024-12-12T18:41:59.1448428+01:00"}
# Response:
{
"errors" : false,
"took" : 0,
"items" : [
{
"index" : {
"_index" : "broken_index",
"_id" : "eDn2u5MBwRlDSf6Nzl8G",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 102421921,
"_primary_term" : 78,
"status" : 201
}
},
{
"index" : {
"_index" : "broken_index",
"_id" : "eTn2u5MBwRlDSf6Nzl8G",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 102421922,
"_primary_term" : 78,
"status" : 201
}
}
]
}
# TCP states:
Established: 24
TimeWait: 9
SynSent: 6
CloseWait: 2
# ThreadPool statistics:
Worker:
Busy: 1
Free: 32766
Min: 16
Max: 32767
IOCP:
Busy: 0
Free: 1000
Min: 1
Max: 1000
My elastic client settings:
var productionSettings = new ElasticsearchClientSettings(new Uri(Constants.Elastic.ProductionApiBaseUrl))
.Authentication(new BasicAuthentication(Constants.Elastic.AffiliateUser, Constants.Elastic.AffiliatePassword))
.DefaultIndex("broken_index")
.RequestTimeout(TimeSpan.FromMinutes(Constants.Elastic.RequestTimeoutMinutes))
.EnableHttpCompression()
.EnableDebugMode();
Issue:
Although the payload and response appear very similar between both projects, the 'BulkIndex()' method doesn't seem to work as expected. The index operation is successful (status 201), but I'm not seeing the doc in the cloud.
Can anyone help me identify what might be causing the issue or provide guidance on what I might be overlooking?
Update & Solution:
The issue in my case was that field names were being set to camel case instead of pascal format. This was remedied by keeping my property names untouched using 'DefaultFieldNameInferrer' in the 'ElasticsearchClientSettings'.
new ElasticsearchClientSettings(uri)
.DefaultFieldNameInferrer(fieldName => fieldName);
This will take your property names and return the same string unmodified.