I am trying to update elasticsearch documents with additional fields.I don't get any errors, but there is no change in documents. I use Elasticsearch 7.10.0 and NEST 7.17.5
using System;
using System.Collections.Generic;
using StackExchange.Redis;
using Nest;
using CSVDataOperationsTest.Classes;
using System.Linq;
using System.Threading.Tasks;
namespace CSVDataOperationsTest
{
public class RedisToElasticsearch
{
private string _stopLoss;
private ElasticClient _client;
public RedisToElasticsearch(
string stopLoss,
string elasticSearchUrl,
string indexName)
{
_stopLoss = stopLoss;
var node = new Uri(elasticSearchUrl);
var settings = new ConnectionSettings(node)
.DefaultIndex(indexName);
_client = new ElasticClient(settings);
}
public T GetValue<T>(IDictionary<string, object> document, string key, T defaultValue = default)
{
if (document.TryGetValue(key, out var value))
{
try
{
if (value is T typedValue)
return typedValue;
return (T)Convert.ChangeType(value, typeof(T));
}
catch
{
return defaultValue;
}
}
return defaultValue;
}
public async void BulkOverrideTickToElastic(
List<ConnectionMultiplexer> redisConnections,
List<RedisTableProps> redisDBNames,
List<RedisTableProps> redisOverrideDBNames,
RedisTableProps redisDBProps,
string indexName,
int customBarLengthInSeconds,
ulong progressQuantifier,
int elasticsearchBatchSaveSize,
int batchSize)
{
RedisDbLists redisDBList = new RedisDbLists(redisOverrideDBNames, redisConnections);
IDatabase redisDatabase = redisConnections[redisDBProps.MultiplexerNumber].GetDatabase(redisDBProps.RedisDbNumber);
ulong currentListLength = (ulong)redisDatabase.ListLength(redisDBProps.TableName);
ulong listStartIndex = redisDBList.GetStartIndexOfGivenListByProps(redisDBProps);
ulong listStopIndex = listStartIndex + currentListLength - 1;
RedisEnumerator mainLoopEnumerator = new RedisEnumerator(
redisOverrideDBNames,
redisConnections,
listStartIndex,
listStopIndex,
batchSize,
decrementMode: true);
ulong currendMainLoopIndex = listStartIndex - 1;
ISearchResponse<Dictionary<string, object>> searchResponse;
List<TickData> currentTickDatas = new List<TickData>();
foreach (string csvData in mainLoopEnumerator.GetPreviousListElement())
{
currendMainLoopIndex++;
currentTickDatas.Add(new TickData(csvData, currendMainLoopIndex, customBarLengthInSeconds));
if ((currendMainLoopIndex - listStartIndex) % progressQuantifier == 0)
{
Console.WriteLine($"{redisDBProps.TableName} currendMainLoopIndex: {currendMainLoopIndex - listStartIndex}");
}
if (currentTickDatas.Count == elasticsearchBatchSaveSize)
{
searchResponse = await SearchForExistingDocumentsAsync(currentTickDatas, indexName, elasticsearchBatchSaveSize);
ProcessBulkUpdates(searchResponse, currentTickDatas, indexName);
}
}
searchResponse = await SearchForExistingDocumentsAsync(currentTickDatas, indexName, elasticsearchBatchSaveSize);
ProcessBulkUpdates(searchResponse, currentTickDatas, indexName);
}
private void ProcessBulkUpdates(ISearchResponse<Dictionary<string, object>> searchResponse, List<TickData> currentTickDatas, string indexName)
{
if (searchResponse.IsValid && searchResponse.Hits.Any() && HasMissingFields(searchResponse))
{
var hitsGroupedByDate = searchResponse.Hits
.GroupBy(hit => GetValue<DateTime>(hit.Source, "date"))
.ToList();
var duplicateDates = hitsGroupedByDate
.Where(g => g.Count() > 1)
.ToList();
var uniqueDateHits = hitsGroupedByDate
.Where(g => g.Count() == 1)
.ToDictionary(g => g.Key, g => g.First());
var existingDocuments = uniqueDateHits
.ToDictionary(kvp => kvp.Key, kvp =>
{
var hit = kvp.Value;
return new
{
Hit = hit,
Document = new TickData
{
Id = hit.Id,
Date = GetValue<DateTime>(hit.Source, "date"),
AskHighTick = GetValue<decimal>(hit.Source, "askHighTick"),
BidLowTick = GetValue<decimal>(hit.Source, "bidLowTick"),
AskVolume = GetValue<int>(hit.Source, "askVolume"),
BidVolume = GetValue<int>(hit.Source, "bidVolume"),
TradeFlag = GetValue<int>(hit.Source, $"{_stopLoss}_tradeFlag"),
UnixCloseTradeDate = GetValue<double>(hit.Source, $"{_stopLoss}_unixCloseTradeDate"),
CloseTradePrice = GetValue<string>(hit.Source, $"{_stopLoss}_closeTradePrice"),
OpenCloseIndexDifference = GetValue<string>(hit.Source, $"{_stopLoss}_openCloseIndexDifference")
}
};
});
var bulkOperations = new List<IBulkOperation>();
foreach (var currentTickData in currentTickDatas)
{
if (existingDocuments.TryGetValue(currentTickData.Date, out var existing))
{
var existingDoc = existing.Document;
if (ShouldUpdateDocument(currentTickData, existingDoc))
{
var updateFields = CreateFieldsToUpdate(currentTickData);
bulkOperations.Add(new BulkUpdateOperation<Dictionary<string, object>, Dictionary<string, object>>(existing.Hit.Id)
{
Doc = updateFields,
Index = indexName
});
}
}
}
ExecuteBulkOperationsIfAny(bulkOperations, indexName);
if (duplicateDates.Any())
{
ProcessDuplicateDateDocuments(duplicateDates, currentTickDatas, indexName);
}
}
}
private void ProcessDuplicateDateDocuments(List<IGrouping<DateTime, IHit<Dictionary<string, object>>>> duplicateDateGroups,
List<TickData> currentTickDatas,
string indexName)
{
var bulkOperations = new List<IBulkOperation>();
foreach (var group in duplicateDateGroups)
{
var date = group.Key;
var currentTickForDate = currentTickDatas.FirstOrDefault(td => td.Date == date);
if (currentTickForDate != null)
{
foreach (var documentHit in group)
{
var updateFields = new Dictionary<string, object>
{
{ $"{_stopLoss}_tradeFlag", currentTickForDate.TradeFlag },
{ $"{_stopLoss}_unixCloseTradeDate", currentTickForDate.UnixCloseTradeDate },
{ $"{_stopLoss}_closeTradePrice", currentTickForDate.CloseTradePrice },
{ $"{_stopLoss}_openCloseIndexDifference", currentTickForDate.OpenCloseIndexDifference }
};
bulkOperations.Add(new BulkUpdateOperation<Dictionary<string, object>, Dictionary<string, object>>(documentHit.Id)
{
Doc = updateFields,
Index = indexName
});
}
}
}
if (bulkOperations.Any())
{
ExecuteBulkOperationsIfAny(bulkOperations, indexName);
}
}
private void ExecuteBulkOperationsIfAny(List<IBulkOperation> bulkOperations, string indexName)
{
if (bulkOperations.Any())
{
var bulkResponse = _client.Bulk(b =>
{
foreach (var operation in bulkOperations)
{
b.AddOperation(operation);
}
return b.Index(indexName);
});
if (!bulkResponse.IsValid)
{
Console.WriteLine($"Error occured during bulk operation: {bulkResponse.DebugInformation}");
}
}
}
private async Task<ISearchResponse<Dictionary<string, object>>> SearchForExistingDocumentsAsync(
List<TickData> tickDatas,
string indexName,
int batchSize)
{
var startDate = tickDatas.First().Date;
var endDate = tickDatas.Last().Date;
return await _client.SearchAsync<Dictionary<string, object>>(s => s
.Index(indexName)
.Size(batchSize)
.Query(q => q
.DateRange(r => r
.Field("date")
.GreaterThanOrEquals(startDate)
.LessThanOrEquals(endDate)
)
)
);
}
private bool ShouldUpdateDocument(TickData currentTickData, TickData existingDoc)
{
return currentTickData.Date.CompareTo(existingDoc.Date) == 0 &&
currentTickData.AskHighTick == existingDoc.AskHighTick &&
currentTickData.BidLowTick == existingDoc.BidLowTick &&
currentTickData.AskVolume == existingDoc.AskVolume &&
currentTickData.BidVolume == existingDoc.BidVolume &&
(currentTickData.TradeFlag != existingDoc.TradeFlag ||
currentTickData.CloseTradePrice != existingDoc.CloseTradePrice ||
currentTickData.UnixCloseTradeDate != existingDoc.UnixCloseTradeDate ||
currentTickData.OpenCloseIndexDifference != existingDoc.OpenCloseIndexDifference);
}
private Dictionary<string, object> CreateFieldsToUpdate(TickData tickData)
{
return new Dictionary<string, object>
{
{ $"{_stopLoss}_tradeFlag", tickData.CloseTradePrice },
{ $"{_stopLoss}_unixCloseTradeDate", tickData.UnixCloseTradeDate },
{ $"{_stopLoss}_closeTradePrice", tickData.CloseTradePrice },
{ $"{_stopLoss}_openCloseIndexDifference", tickData.OpenCloseIndexDifference }
};
}
private bool HasMissingFields(ISearchResponse<Dictionary<string, object>> searchResponse)
{
var requiredFields = new[]
{
$"{_stopLoss}_tradeFlag",
$"{_stopLoss}_unixCloseTradeDate",
$"{_stopLoss}_closeTradePrice",
$"{_stopLoss}_openCloseIndexDifference"
};
return searchResponse.Hits.Any(hit =>
requiredFields.Any(field => !hit.Source.ContainsKey(field) || hit.Source[field] == null));
}
}
}
I am trying to run BulkOverrideTickToElastic function this way:
Task[] tasks = new Task[tableNames.Count];
for (int i = 0; i < tableNames.Count; i++)
{
int threadNumber = i;
RedisTableProps propsInCurrentThread = redis1MinsKeyPropsList[threadNumber];
RedisTableProps secondPropsInCurrentThread = redis2TickPropsList[threadNumber];
IDatabase redisDatabase = redisConnections[propsInCurrentThread.MultiplexerNumber].GetDatabase(propsInCurrentThread.RedisDbNumber);
Thread.CurrentThread.CurrentCulture = CultureInfo.CreateSpecificCulture("en-GB");
RedisToElasticsearch redisToElast = new RedisToElasticsearch(_stopLoos, _elasticsearchUrl, indexName);
if (tableNamesToCalculate.Contains(redis1MinsKeyPropsList[threadNumber].Year.ToString()))
{
tasks[threadNumber] = Task.Run(() => {
var tableName = redis1MinsKeyPropsList[threadNumber].TableName;
redisToElast.BulkOverrideTickToElastic(
redisConnections,
redis1MinsKeyPropsList,
redis2TickPropsList,
secondPropsInCurrentThread,
indexName,
customBarLengthInSeconds,
progressQuantifier,
_elasticsearchBatchSaveSize,
batchSize);
});
}
else
{
Console.WriteLine($"{propsInCurrentThread.TableName} is not in start - stop year scope");
tasks[threadNumber] = Task.Run(() =>
{
return;
});
};
}
Task.WaitAll(tasks);
It does not work when I do this but it does work when I do it only in one thread. When i do this in multiple threads I do not see any errors but elasticsearch does not update any documents.