Hi,
When indexing files with BulkOperation with NEST (7.17.5) client and Elastic Search 8.8.0 or 8.12.2, after a few successful operations we have an error message. If we retry, the error will happen on different file.
If I disable directstreaming on the request, the error doesn't occur anymore.
BulkDescriptor bulkDescriptorWithPipeline = new BulkDescriptor().Pipeline(EsConstants.FilesPipeline);
// Adding this line seems to fix the issue
.RequestConfiguration(req => req.DisableDirectStreaming());
var props = new Dictionary<string, object>() {
{ "relatedfilejoin", new { parent = parentId, name = "FILE" } },
{ "content", fileAsBase64 },
{ "_fileextension", fileExtension },
{ "_filename", fileName }
};
bulkDescriptorWithPipeline.AddOperation(new BulkIndexOperation<dynamic>(l_props)
{
Index = indexName,
Id = fileId,
Document = props,
Routing = parentId
});
Here is the callstack :
# FailureReason: Unrecoverable/Unexpected BadRequest while attempting POST on http://127.0.0.1:9200/_bulk?pipeline=filespipeline
- [1] BadRequest: Node: http://127.0.0.1:9200/ Exception: NotSupportedException Took: 00:00:04.3364656
# Audit exception in step 1 BadRequest:
System.NotSupportedException: The stream does not support concurrent IO read or write operations.
at System.Net.ConnectStream.InternalWrite(Boolean async, Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
at System.Net.ConnectStream.BeginWrite(Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
at System.Net.Http.StreamToStreamCopy.TryStartWriteSync(Int32 bytesRead)
at System.Net.Http.StreamToStreamCopy.StartRead()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Elasticsearch.Net.HttpConnection.Request[TResponse](RequestData requestData)
at Elasticsearch.Net.RequestPipeline.CallElasticsearch[TResponse](RequestData requestData)
at Elasticsearch.Net.Transport`1.Request[TResponse](HttpMethod method, String path, PostData data, IRequestParameters requestParameters)
# Inner Exception: The stream does not support concurrent IO read or write operations.
System.NotSupportedException: The stream does not support concurrent IO read or write operations.
at System.Net.ConnectStream.InternalWrite(Boolean async, Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
at System.Net.ConnectStream.BeginWrite(Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
at System.Net.Http.StreamToStreamCopy.TryStartWriteSync(Int32 bytesRead)
at System.Net.Http.StreamToStreamCopy.StartRead()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Elasticsearch.Net.HttpConnection.Request[TResponse](RequestData requestData)
at Elasticsearch.Net.RequestPipeline.CallElasticsearch[TResponse](RequestData requestData)
at Elasticsearch.Net.Transport`1.Request[TResponse](HttpMethod method, String path, PostData data, IRequestParameters requestParameters)
# Exception:
Elasticsearch.Net.UnexpectedElasticsearchClientException: The stream does not support concurrent IO read or write operations. ---> System.NotSupportedException: The stream does not support concurrent IO read or write operations.
at System.Net.ConnectStream.InternalWrite(Boolean async, Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
at System.Net.ConnectStream.BeginWrite(Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
at System.Net.Http.StreamToStreamCopy.TryStartWriteSync(Int32 bytesRead)
at System.Net.Http.StreamToStreamCopy.StartRead()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Elasticsearch.Net.HttpConnection.Request[TResponse](RequestData requestData)
at Elasticsearch.Net.RequestPipeline.CallElasticsearch[TResponse](RequestData requestData)
at Elasticsearch.Net.Transport`1.Request[TResponse](HttpMethod method, String path, PostData data, IRequestParameters requestParameters)
--- End of inner exception stack trace ---
at Elasticsearch.Net.Transport`1.Request[TResponse](HttpMethod method, String path, PostData data, IRequestParameters requestParameters)
at Nest.ElasticClient.Bulk(IBulkRequest request)
I have this warning elasticsearch.log, but I don't know if it is related to the issue above :
[2024-03-04T13:06:01,583][WARN ][o.e.h.AbstractHttpServerTransport] [APT02-7FHMQ93] caught exception while handling client http traffic, closing connection Netty4HttpChannel{localAddress=/127.0.0.1:9200, remoteAddress=/127.0.0.1:57403}
io.netty.handler.codec.PrematureChannelClosureException: Channel closed while still aggregating message
at io.netty.handler.codec.MessageAggregator.channelInactive(MessageAggregator.java:436) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[?:?]
at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:235) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[?:?]
at org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.channelInactive(Netty4HttpHeaderValidator.java:186) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411) ~[?:?]
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[?:?]
at org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler.channelInactive(Netty4WriteThrottlingHandler.java:109) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[?:?]
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[?:?]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[?:?]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[?:?]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[?:?]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[?:?]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[?:?]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[?:?]
at java.lang.Thread.run(Thread.java:1583) ~[?:?]
[2024-03-04T13:06:08,730][INFO ][o.e.i.b.HierarchyCircuitBreakerService] [APT02-7FHMQ93] attempting to trigger G1GC due to high heap usage [1043445192]
[2024-03-04T13:06:08,742][INFO ][o.e.i.b.HierarchyCircuitBreakerService] [APT02-7FHMQ93] GC did bring memory usage down, before [1043445192], after [685622688], allocations [6], duration [11]
Any idea to solve this issue would be appreciated