Bulk Observable giving refresh error on inserting documents

Hi,

I am using Elastic Cloud
Elastic Stack version : 7.17
NEST Nuget Package : 7.17

I am consuming bulk Observable indexing with and trying to insert records in bulk and facing the following error
The Exception message is :
refreshing after all documents have indexed failed

The full Exception is as follows:

FailureReason: BadResponse while attempting POST on https://{cloudurl}.cloud.es.io/test-22-11-2022/_refresh?pretty=true&error_trace=true

Audit trail of this API call:

  • [1] BadRequest: Node: https://{cloudurl}.gcp.cloud.es.io/ Took: 00:00:10.0288646
  • [2] MaxTimeoutReached:

OriginalException: System.Threading.Tasks.TaskCanceledException: The request was canceled due to the configured HttpClient.Timeout of 10 seconds elapsing.

---> System.TimeoutException: The operation was canceled.
---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
---> System.IO.IOException: Unable to read data from the transport connection: The I/O operation has been aborted because of either a thread exit or an application request..
---> System.Net.Sockets.SocketException (995): The I/O operation has been aborted because of either a thread exit or an application request.
--- End of inner exception stack trace ---
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource<System.Int32>.GetResult(Int16 token)
at System.Net.Security.SslStream.EnsureFullTlsFrameAsync[TIOAdapter](TIOAdapter adapter)
at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory`1 buffer)
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.HttpConnectionPool.SendWithVersionDetectionAndRetryAsync(HttpRequestMessage request, Boolean async, Boolean doRequestAuth, CancellationToken cancellationToken)
at System.Net.Http.DiagnosticsHandler.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.HttpClient.g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)
--- End of inner exception stack trace ---
--- End of inner exception stack trace ---
at System.Net.Http.HttpClient.HandleFailure(Exception e, Boolean telemetryStarted, HttpResponseMessage response, CancellationTokenSource cts, CancellationToken cancellationToken, CancellationTokenSource pendingRequestsCts)
at System.Net.Http.HttpClient.g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)
at Elasticsearch.Net.HttpConnection.Request[TResponse](RequestData requestData)

Request:

<Request stream not captured or already read to completion by serializer. Set DisableDirectStreaming() on ConnectionSettings to force it to be set on the response.>

Response:

TCP states:

CloseWait: 2
Established: 66
FinWait2: 4
TimeWait: 6
FinWait1: 1

ThreadPool statistics:

Worker:
Busy: 2
Free: 32765
Min: 8
Max: 32767
IOCP:
Busy: 1
Free: 999
Min: 8
Max: 1000

Exception:

Elasticsearch.Net.ElasticsearchClientException: Refreshing after all documents have indexed failed
at Nest.BlockingSubscribeExtensions.WaitOnObservable[TObservable,TObserve,TObserver](TObservable observable, TimeSpan maximumRunTime, Func3 factory) at Nest.BlockingSubscribeExtensions.Wait[T](BulkAllObservable1 observable, TimeSpan maximumRunTime, Action`1 onNext)

CONNECTION CODE
CloudConnectionPool cloudConnectionPool = new("CLOUDID",
new BasicAuthenticationCredentials("USER", "PASSWORD"));

        ConnectionSettings _cloudConnectionSettings = new ConnectionSettings(cloudConnectionPool).RequestTimeout(TimeSpan.FromSeconds(10));

        _cloudConnectionSettings.PrettyJson().EnableDebugMode();

BULK INSERTION DOCUMENT CODE

int check = (int)docs.FirstOrDefault().MessageType;
            ElasticClient client = new ElasticClient(ClientProvider());
            var bulkAllObservable = client.BulkAll(docs, b => b
                                          .BufferToBulk((descriptor, buffer) =>
                                          {
                                              foreach (var doc in buffer)
                                              {
                                                  descriptor.Index<MessageDocument>(bi => bi
                                                      .Index(currentIndexName)
                                                      .Document(doc)
                                                      .Routing(check)
                                                  );
                                              }
                                          })
                                        .RetryDocumentPredicate((bulkResponseItem, doc) =>
                                        {
                                            return bulkResponseItem.Error.Index == currentIndexName;
                                        })
                                          .DroppedDocumentCallback((bulkResponseItem, doc) =>
                                          {
                                              Console.WriteLine($"Unable to index: {bulkResponseItem} {doc}");
                                          }));

            var waitHandle = new ManualResetEvent(false);
            ExceptionDispatchInfo exceptionDispatchInfo = null;

            var observer = new BulkAllObserver(
                onNext: response =>
                {
                    // do something e.g. write number of pages to console
                },
                onError: exception =>
                {
                    exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception);
                    waitHandle.Set();
                },
                onCompleted: () => waitHandle.Set());

            bulkAllObservable.Subscribe(observer);

            waitHandle.WaitOne();

            exceptionDispatchInfo?.Throw();

Welcome to our community! :smiley:

Please format your code/logs/config using the </> button, or markdown style back ticks. It helps to make things easy to read which helps us help you :slight_smile:

Is there anything in the Elasticsearch logs?

@warkolm It was the trial cloud version on which I was trying this, On Actual cloud it worked fine.
Will this can be the reason?

There's really no difference between the two.