Bulk Observable giving refresh error on inserting documents

Hi,

I am using Elastic Cloud
Elastic Stack version : 7.17
NEST Nuget Package : 7.17

I am consuming bulk Observable indexing with and trying to insert records in bulk and facing the following error
The Exception message is :
refreshing after all documents have indexed failed

The full Exception is as follows:

FailureReason: BadResponse while attempting POST on https://{cloudurl}.cloud.es.io/test-22-11-2022/_refresh?pretty=true&error_trace=true

Audit trail of this API call:

  • [1] BadRequest: Node: https://{cloudurl}.gcp.cloud.es.io/ Took: 00:00:10.0288646
  • [2] MaxTimeoutReached:

OriginalException: System.Threading.Tasks.TaskCanceledException: The request was canceled due to the configured HttpClient.Timeout of 10 seconds elapsing.

---> System.TimeoutException: The operation was canceled.
---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
---> System.IO.IOException: Unable to read data from the transport connection: The I/O operation has been aborted because of either a thread exit or an application request..
---> System.Net.Sockets.SocketException (995): The I/O operation has been aborted because of either a thread exit or an application request.
--- End of inner exception stack trace ---
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource<System.Int32>.GetResult(Int16 token)
at System.Net.Security.SslStream.EnsureFullTlsFrameAsync[TIOAdapter](TIOAdapter adapter)
at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory`1 buffer)
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.HttpConnectionPool.SendWithVersionDetectionAndRetryAsync(HttpRequestMessage request, Boolean async, Boolean doRequestAuth, CancellationToken cancellationToken)
at System.Net.Http.DiagnosticsHandler.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.HttpClient.g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)
--- End of inner exception stack trace ---
--- End of inner exception stack trace ---
at System.Net.Http.HttpClient.HandleFailure(Exception e, Boolean telemetryStarted, HttpResponseMessage response, CancellationTokenSource cts, CancellationToken cancellationToken, CancellationTokenSource pendingRequestsCts)
at System.Net.Http.HttpClient.g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)
at Elasticsearch.Net.HttpConnection.Request[TResponse](RequestData requestData)

Request:

<Request stream not captured or already read to completion by serializer. Set DisableDirectStreaming() on ConnectionSettings to force it to be set on the response.>

Response:

TCP states:

CloseWait: 2
Established: 66
FinWait2: 4
TimeWait: 6
FinWait1: 1

ThreadPool statistics:

Worker:
Busy: 2
Free: 32765
Min: 8
Max: 32767
IOCP:
Busy: 1
Free: 999
Min: 8
Max: 1000

Exception:

Elasticsearch.Net.ElasticsearchClientException: Refreshing after all documents have indexed failed
at Nest.BlockingSubscribeExtensions.WaitOnObservable[TObservable,TObserve,TObserver](TObservable observable, TimeSpan maximumRunTime, Func3 factory) at Nest.BlockingSubscribeExtensions.Wait[T](BulkAllObservable1 observable, TimeSpan maximumRunTime, Action`1 onNext)

CONNECTION CODE
CloudConnectionPool cloudConnectionPool = new("CLOUDID",
new BasicAuthenticationCredentials("USER", "PASSWORD"));

        ConnectionSettings _cloudConnectionSettings = new ConnectionSettings(cloudConnectionPool).RequestTimeout(TimeSpan.FromSeconds(10));

        _cloudConnectionSettings.PrettyJson().EnableDebugMode();

BULK INSERTION DOCUMENT CODE

int check = (int)docs.FirstOrDefault().MessageType;
            ElasticClient client = new ElasticClient(ClientProvider());
            var bulkAllObservable = client.BulkAll(docs, b => b
                                          .BufferToBulk((descriptor, buffer) =>
                                          {
                                              foreach (var doc in buffer)
                                              {
                                                  descriptor.Index<MessageDocument>(bi => bi
                                                      .Index(currentIndexName)
                                                      .Document(doc)
                                                      .Routing(check)
                                                  );
                                              }
                                          })
                                        .RetryDocumentPredicate((bulkResponseItem, doc) =>
                                        {
                                            return bulkResponseItem.Error.Index == currentIndexName;
                                        })
                                          .DroppedDocumentCallback((bulkResponseItem, doc) =>
                                          {
                                              Console.WriteLine($"Unable to index: {bulkResponseItem} {doc}");
                                          }));

            var waitHandle = new ManualResetEvent(false);
            ExceptionDispatchInfo exceptionDispatchInfo = null;

            var observer = new BulkAllObserver(
                onNext: response =>
                {
                    // do something e.g. write number of pages to console
                },
                onError: exception =>
                {
                    exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception);
                    waitHandle.Set();
                },
                onCompleted: () => waitHandle.Set());

            bulkAllObservable.Subscribe(observer);

            waitHandle.WaitOne();

            exceptionDispatchInfo?.Throw();

Welcome to our community! :smiley:

Please format your code/logs/config using the </> button, or markdown style back ticks. It helps to make things easy to read which helps us help you :slight_smile:

Is there anything in the Elasticsearch logs?

@warkolm It was the trial cloud version on which I was trying this, On Actual cloud it worked fine.
Will this can be the reason?

There's really no difference between the two.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.