Hi,
I am using Elastic Cloud
Elastic Stack version : 7.17
NEST Nuget Package : 7.17
I am consuming bulk Observable indexing with and trying to insert records in bulk and facing the following error
The Exception message is :
refreshing after all documents have indexed failed
The full Exception is as follows:
FailureReason: BadResponse while attempting POST on https://{cloudurl}.cloud.es.io/test-22-11-2022/_refresh?pretty=true&error_trace=true
Audit trail of this API call:
- [1] BadRequest: Node: https://{cloudurl}.gcp.cloud.es.io/ Took: 00:00:10.0288646
- [2] MaxTimeoutReached:
OriginalException: System.Threading.Tasks.TaskCanceledException: The request was canceled due to the configured HttpClient.Timeout of 10 seconds elapsing.
---> System.TimeoutException: The operation was canceled.
---> System.Threading.Tasks.TaskCanceledException: The operation was canceled.
---> System.IO.IOException: Unable to read data from the transport connection: The I/O operation has been aborted because of either a thread exit or an application request..
---> System.Net.Sockets.SocketException (995): The I/O operation has been aborted because of either a thread exit or an application request.
--- End of inner exception stack trace ---
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource<System.Int32>.GetResult(Int16 token)
at System.Net.Security.SslStream.EnsureFullTlsFrameAsync[TIOAdapter](TIOAdapter adapter)
at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory`1 buffer)
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.HttpConnectionPool.SendWithVersionDetectionAndRetryAsync(HttpRequestMessage request, Boolean async, Boolean doRequestAuth, CancellationToken cancellationToken)
at System.Net.Http.DiagnosticsHandler.SendAsyncCore(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.DecompressionHandler.SendAsync(HttpRequestMessage request, Boolean async, CancellationToken cancellationToken)
at System.Net.Http.HttpClient.g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)
--- End of inner exception stack trace ---
--- End of inner exception stack trace ---
at System.Net.Http.HttpClient.HandleFailure(Exception e, Boolean telemetryStarted, HttpResponseMessage response, CancellationTokenSource cts, CancellationToken cancellationToken, CancellationTokenSource pendingRequestsCts)
at System.Net.Http.HttpClient.g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)
at Elasticsearch.Net.HttpConnection.Request[TResponse](RequestData requestData)
Request:
<Request stream not captured or already read to completion by serializer. Set DisableDirectStreaming() on ConnectionSettings to force it to be set on the response.>
Response:
TCP states:
CloseWait: 2
Established: 66
FinWait2: 4
TimeWait: 6
FinWait1: 1
ThreadPool statistics:
Worker:
Busy: 2
Free: 32765
Min: 8
Max: 32767
IOCP:
Busy: 1
Free: 999
Min: 8
Max: 1000
Exception:
Elasticsearch.Net.ElasticsearchClientException: Refreshing after all documents have indexed failed
at Nest.BlockingSubscribeExtensions.WaitOnObservable[TObservable,TObserve,TObserver](TObservable observable, TimeSpan maximumRunTime, Func3 factory) at Nest.BlockingSubscribeExtensions.Wait[T](BulkAllObservable
1 observable, TimeSpan maximumRunTime, Action`1 onNext)
CONNECTION CODE
CloudConnectionPool cloudConnectionPool = new("CLOUDID",
new BasicAuthenticationCredentials("USER", "PASSWORD"));
ConnectionSettings _cloudConnectionSettings = new ConnectionSettings(cloudConnectionPool).RequestTimeout(TimeSpan.FromSeconds(10));
_cloudConnectionSettings.PrettyJson().EnableDebugMode();
BULK INSERTION DOCUMENT CODE
int check = (int)docs.FirstOrDefault().MessageType;
ElasticClient client = new ElasticClient(ClientProvider());
var bulkAllObservable = client.BulkAll(docs, b => b
.BufferToBulk((descriptor, buffer) =>
{
foreach (var doc in buffer)
{
descriptor.Index<MessageDocument>(bi => bi
.Index(currentIndexName)
.Document(doc)
.Routing(check)
);
}
})
.RetryDocumentPredicate((bulkResponseItem, doc) =>
{
return bulkResponseItem.Error.Index == currentIndexName;
})
.DroppedDocumentCallback((bulkResponseItem, doc) =>
{
Console.WriteLine($"Unable to index: {bulkResponseItem} {doc}");
}));
var waitHandle = new ManualResetEvent(false);
ExceptionDispatchInfo exceptionDispatchInfo = null;
var observer = new BulkAllObserver(
onNext: response =>
{
// do something e.g. write number of pages to console
},
onError: exception =>
{
exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception);
waitHandle.Set();
},
onCompleted: () => waitHandle.Set());
bulkAllObservable.Subscribe(observer);
waitHandle.WaitOne();
exceptionDispatchInfo?.Throw();