Documents not updating in Elasticsearch despite successful API requests

I am trying to update elasticsearch documents with additional fields.I don't get any errors, but there is no change in documents. I use Elasticsearch 7.10.0 and NEST 7.17.5

using System;
using System.Collections.Generic;
using StackExchange.Redis;
using Nest;
using CSVDataOperationsTest.Classes;
using System.Linq;
using System.Threading.Tasks;

namespace CSVDataOperationsTest
{
    public class RedisToElasticsearch
    {
        private string _stopLoss;
        private ElasticClient _client;

        public RedisToElasticsearch(
            string stopLoss,
            string elasticSearchUrl,
            string indexName)
        {
            _stopLoss = stopLoss;
            var node = new Uri(elasticSearchUrl);
            var settings = new ConnectionSettings(node)
                .DefaultIndex(indexName);
            _client = new ElasticClient(settings);
        }

        public T GetValue<T>(IDictionary<string, object> document, string key, T defaultValue = default)
        {
            if (document.TryGetValue(key, out var value))
            {
                try
                {
                    if (value is T typedValue)
                        return typedValue;

                    return (T)Convert.ChangeType(value, typeof(T));
                }
                catch
                {
                    return defaultValue;
                }
            }
            return defaultValue;
        }

        public async void BulkOverrideTickToElastic(
           List<ConnectionMultiplexer> redisConnections,
           List<RedisTableProps> redisDBNames,
           List<RedisTableProps> redisOverrideDBNames,
           RedisTableProps redisDBProps,
           string indexName,
           int customBarLengthInSeconds,
           ulong progressQuantifier,
           int elasticsearchBatchSaveSize,
           int batchSize)
        {

            RedisDbLists redisDBList = new RedisDbLists(redisOverrideDBNames, redisConnections);
            IDatabase redisDatabase = redisConnections[redisDBProps.MultiplexerNumber].GetDatabase(redisDBProps.RedisDbNumber);
            ulong currentListLength = (ulong)redisDatabase.ListLength(redisDBProps.TableName);
            ulong listStartIndex = redisDBList.GetStartIndexOfGivenListByProps(redisDBProps);
            ulong listStopIndex = listStartIndex + currentListLength - 1;

            RedisEnumerator mainLoopEnumerator = new RedisEnumerator(
                redisOverrideDBNames,
                redisConnections,
                listStartIndex,
                listStopIndex,
                batchSize,
                decrementMode: true);

            ulong currendMainLoopIndex = listStartIndex - 1;
            ISearchResponse<Dictionary<string, object>> searchResponse;
            List<TickData> currentTickDatas = new List<TickData>();
            foreach (string csvData in mainLoopEnumerator.GetPreviousListElement())
            {
                currendMainLoopIndex++;
                currentTickDatas.Add(new TickData(csvData, currendMainLoopIndex, customBarLengthInSeconds));

                if ((currendMainLoopIndex - listStartIndex) % progressQuantifier == 0)
                {
                    Console.WriteLine($"{redisDBProps.TableName} currendMainLoopIndex: {currendMainLoopIndex - listStartIndex}");
                }

                if (currentTickDatas.Count == elasticsearchBatchSaveSize)
                {
                    searchResponse = await SearchForExistingDocumentsAsync(currentTickDatas, indexName, elasticsearchBatchSaveSize);

                    ProcessBulkUpdates(searchResponse, currentTickDatas, indexName);
                }
            }
            searchResponse = await SearchForExistingDocumentsAsync(currentTickDatas, indexName, elasticsearchBatchSaveSize);
            ProcessBulkUpdates(searchResponse, currentTickDatas, indexName);
        }

        private void ProcessBulkUpdates(ISearchResponse<Dictionary<string, object>> searchResponse, List<TickData> currentTickDatas, string indexName)
        {
            if (searchResponse.IsValid && searchResponse.Hits.Any() && HasMissingFields(searchResponse))
            {
                var hitsGroupedByDate = searchResponse.Hits
                    .GroupBy(hit => GetValue<DateTime>(hit.Source, "date"))
                    .ToList();

                var duplicateDates = hitsGroupedByDate
                    .Where(g => g.Count() > 1)
                    .ToList();


                var uniqueDateHits = hitsGroupedByDate
                    .Where(g => g.Count() == 1)
                    .ToDictionary(g => g.Key, g => g.First());

                var existingDocuments = uniqueDateHits
                    .ToDictionary(kvp => kvp.Key, kvp =>
                    {
                        var hit = kvp.Value;
                        return new
                        {
                            Hit = hit,
                            Document = new TickData
                            {
                                Id = hit.Id,
                                Date = GetValue<DateTime>(hit.Source, "date"),
                                AskHighTick = GetValue<decimal>(hit.Source, "askHighTick"),
                                BidLowTick = GetValue<decimal>(hit.Source, "bidLowTick"),
                                AskVolume = GetValue<int>(hit.Source, "askVolume"),
                                BidVolume = GetValue<int>(hit.Source, "bidVolume"),
                                TradeFlag = GetValue<int>(hit.Source, $"{_stopLoss}_tradeFlag"),
                                UnixCloseTradeDate = GetValue<double>(hit.Source, $"{_stopLoss}_unixCloseTradeDate"),
                                CloseTradePrice = GetValue<string>(hit.Source, $"{_stopLoss}_closeTradePrice"),
                                OpenCloseIndexDifference = GetValue<string>(hit.Source, $"{_stopLoss}_openCloseIndexDifference")
                            }
                        };
                    });

                var bulkOperations = new List<IBulkOperation>();
                foreach (var currentTickData in currentTickDatas)
                {
                    if (existingDocuments.TryGetValue(currentTickData.Date, out var existing))
                    {
                        var existingDoc = existing.Document;
                        if (ShouldUpdateDocument(currentTickData, existingDoc))
                        {
                            var updateFields = CreateFieldsToUpdate(currentTickData);
                            bulkOperations.Add(new BulkUpdateOperation<Dictionary<string, object>, Dictionary<string, object>>(existing.Hit.Id)
                            {
                                Doc = updateFields,
                                Index = indexName
                            });
                        }
                    }
                }
                ExecuteBulkOperationsIfAny(bulkOperations, indexName);

                if (duplicateDates.Any())
                {
                    ProcessDuplicateDateDocuments(duplicateDates, currentTickDatas, indexName);
                }
            }
        }

        private void ProcessDuplicateDateDocuments(List<IGrouping<DateTime, IHit<Dictionary<string, object>>>> duplicateDateGroups,
                                       List<TickData> currentTickDatas,
                                       string indexName)
        {
            var bulkOperations = new List<IBulkOperation>();

            foreach (var group in duplicateDateGroups)
            {
                var date = group.Key;
                var currentTickForDate = currentTickDatas.FirstOrDefault(td => td.Date == date);

                if (currentTickForDate != null)
                {
                    foreach (var documentHit in group)
                    {
                        var updateFields = new Dictionary<string, object>
                {
                    { $"{_stopLoss}_tradeFlag", currentTickForDate.TradeFlag },
                    { $"{_stopLoss}_unixCloseTradeDate", currentTickForDate.UnixCloseTradeDate },
                    { $"{_stopLoss}_closeTradePrice", currentTickForDate.CloseTradePrice },
                    { $"{_stopLoss}_openCloseIndexDifference", currentTickForDate.OpenCloseIndexDifference }
                };

                        bulkOperations.Add(new BulkUpdateOperation<Dictionary<string, object>, Dictionary<string, object>>(documentHit.Id)
                        {
                            Doc = updateFields,
                            Index = indexName
                        });
                    }
                }
            }

            if (bulkOperations.Any())
            {
                ExecuteBulkOperationsIfAny(bulkOperations, indexName);
            }
        }

        private void ExecuteBulkOperationsIfAny(List<IBulkOperation> bulkOperations, string indexName)
        {
            if (bulkOperations.Any())
            {
                var bulkResponse = _client.Bulk(b =>
                {
                    foreach (var operation in bulkOperations)
                    {
                        b.AddOperation(operation);
                    }
                    return b.Index(indexName);
                });
                if (!bulkResponse.IsValid)
                {
                    Console.WriteLine($"Error occured during bulk operation: {bulkResponse.DebugInformation}");
                }
            }
        }

        private async Task<ISearchResponse<Dictionary<string, object>>> SearchForExistingDocumentsAsync(
            List<TickData> tickDatas,
            string indexName,
            int batchSize)
        {
            var startDate = tickDatas.First().Date;
            var endDate = tickDatas.Last().Date;

            return await _client.SearchAsync<Dictionary<string, object>>(s => s
                .Index(indexName)
                .Size(batchSize)
                .Query(q => q
                    .DateRange(r => r
                        .Field("date")
                        .GreaterThanOrEquals(startDate)
                        .LessThanOrEquals(endDate)
                    )
                )
            );
        }

        private bool ShouldUpdateDocument(TickData currentTickData, TickData existingDoc)
        {
            return currentTickData.Date.CompareTo(existingDoc.Date) == 0 &&
                   currentTickData.AskHighTick == existingDoc.AskHighTick &&
                   currentTickData.BidLowTick == existingDoc.BidLowTick &&
                   currentTickData.AskVolume == existingDoc.AskVolume &&
                   currentTickData.BidVolume == existingDoc.BidVolume &&
                   (currentTickData.TradeFlag != existingDoc.TradeFlag ||
                    currentTickData.CloseTradePrice != existingDoc.CloseTradePrice ||
                    currentTickData.UnixCloseTradeDate != existingDoc.UnixCloseTradeDate ||
                    currentTickData.OpenCloseIndexDifference != existingDoc.OpenCloseIndexDifference);
        }

        private Dictionary<string, object> CreateFieldsToUpdate(TickData tickData)
        {
            return new Dictionary<string, object>
            {
                { $"{_stopLoss}_tradeFlag", tickData.CloseTradePrice },
                { $"{_stopLoss}_unixCloseTradeDate", tickData.UnixCloseTradeDate },
                { $"{_stopLoss}_closeTradePrice", tickData.CloseTradePrice },
                { $"{_stopLoss}_openCloseIndexDifference", tickData.OpenCloseIndexDifference }
            };
        }

        private bool HasMissingFields(ISearchResponse<Dictionary<string, object>> searchResponse)
        {
            var requiredFields = new[]
            {
                $"{_stopLoss}_tradeFlag",
                $"{_stopLoss}_unixCloseTradeDate",
                $"{_stopLoss}_closeTradePrice",
                $"{_stopLoss}_openCloseIndexDifference"
            };

            return searchResponse.Hits.Any(hit =>
                requiredFields.Any(field => !hit.Source.ContainsKey(field) || hit.Source[field] == null));
        }
    }
}

I am trying to run BulkOverrideTickToElastic function this way:

Task[] tasks = new Task[tableNames.Count];

for (int i = 0; i < tableNames.Count; i++)
{
    int threadNumber = i;

    RedisTableProps propsInCurrentThread = redis1MinsKeyPropsList[threadNumber];
    RedisTableProps secondPropsInCurrentThread = redis2TickPropsList[threadNumber];
    IDatabase redisDatabase = redisConnections[propsInCurrentThread.MultiplexerNumber].GetDatabase(propsInCurrentThread.RedisDbNumber);

    Thread.CurrentThread.CurrentCulture = CultureInfo.CreateSpecificCulture("en-GB");
    RedisToElasticsearch redisToElast = new RedisToElasticsearch(_stopLoos, _elasticsearchUrl, indexName);
    if (tableNamesToCalculate.Contains(redis1MinsKeyPropsList[threadNumber].Year.ToString()))
    {
        tasks[threadNumber] = Task.Run(() => {
            var tableName = redis1MinsKeyPropsList[threadNumber].TableName;
                redisToElast.BulkOverrideTickToElastic(
                    redisConnections,
                    redis1MinsKeyPropsList,
                    redis2TickPropsList,
                    secondPropsInCurrentThread,
                    indexName,
                    customBarLengthInSeconds,
                    progressQuantifier,
                    _elasticsearchBatchSaveSize,
                    batchSize);
        });
    }
    else
    {
        Console.WriteLine($"{propsInCurrentThread.TableName} is not in start - stop year scope");
        tasks[threadNumber] = Task.Run(() =>
        {
            return;
        });
    };
}
Task.WaitAll(tasks);

It does not work when I do this but it does work when I do it only in one thread. When i do this in multiple threads I do not see any errors but elasticsearch does not update any documents.

Elasticsearch might be getting overwhelmed with simultaneous bulk update requests, especially if they target the same documents or indices.

BulkOverrideTickToElastic is marked as async void, which is risky because exceptions within it won't be caught by the calling method, leading to silent failures

Better to add a delay in between bulk request so that we are not adding load on the cluster. And add logging for each layer for the object state.

1 Like

There is a Microsoft blog post that talks about async void:

I would as well highly recommend changing this to async Task to see if any exceptions previously have been swallowed.

Besides that, your Task.Run(() => { related code as well looks suspicious and error prone to me.

Happy to help further when the underlying C# code issues are fixed and we have a proper exception :slightly_smiling_face:

I adjusted the code based on your suggestions. I also discovered invalid response in the line:

if (searchResponse.IsValid && searchResponse.Hits.Any() && HasMissingFields(searchResponse))

The response was invalid because of a timeout. Elasticsearch was overwhelmed, so I used a semaphore to limit the number of simultaneous threads. I also extended the timeout period. Now everything is working fine. In the future, I may improve this code because I am still not satisfied with how it looks.