Intermittent error while ingesting files in pipeline

Hi,

When indexing files with BulkOperation with NEST (7.17.5) client and Elastic Search 8.8.0 or 8.12.2, after a few successful operations we have an error message. If we retry, the error will happen on different file.
If I disable directstreaming on the request, the error doesn't occur anymore.

BulkDescriptor bulkDescriptorWithPipeline = new BulkDescriptor().Pipeline(EsConstants.FilesPipeline);

// Adding this line seems to fix the issue
.RequestConfiguration(req => req.DisableDirectStreaming());

var props = new Dictionary<string, object>() {
     { "relatedfilejoin", new { parent = parentId, name = "FILE" } },
     { "content", fileAsBase64 },
     { "_fileextension", fileExtension },
     { "_filename", fileName }
};

bulkDescriptorWithPipeline.AddOperation(new BulkIndexOperation<dynamic>(l_props)
{
    Index = indexName,
    Id = fileId,
    Document = props,
    Routing = parentId
});

Here is the callstack :

# FailureReason: Unrecoverable/Unexpected BadRequest while attempting POST on http://127.0.0.1:9200/_bulk?pipeline=filespipeline
 - [1] BadRequest: Node: http://127.0.0.1:9200/ Exception: NotSupportedException Took: 00:00:04.3364656
# Audit exception in step 1 BadRequest:
System.NotSupportedException: The stream does not support concurrent IO read or write operations.
   at System.Net.ConnectStream.InternalWrite(Boolean async, Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
   at System.Net.ConnectStream.BeginWrite(Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
   at System.Net.Http.StreamToStreamCopy.TryStartWriteSync(Int32 bytesRead)
   at System.Net.Http.StreamToStreamCopy.StartRead()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Elasticsearch.Net.HttpConnection.Request[TResponse](RequestData requestData)
   at Elasticsearch.Net.RequestPipeline.CallElasticsearch[TResponse](RequestData requestData)
   at Elasticsearch.Net.Transport`1.Request[TResponse](HttpMethod method, String path, PostData data, IRequestParameters requestParameters)
# Inner Exception: The stream does not support concurrent IO read or write operations.
System.NotSupportedException: The stream does not support concurrent IO read or write operations.
   at System.Net.ConnectStream.InternalWrite(Boolean async, Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
   at System.Net.ConnectStream.BeginWrite(Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
   at System.Net.Http.StreamToStreamCopy.TryStartWriteSync(Int32 bytesRead)
   at System.Net.Http.StreamToStreamCopy.StartRead()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Elasticsearch.Net.HttpConnection.Request[TResponse](RequestData requestData)
   at Elasticsearch.Net.RequestPipeline.CallElasticsearch[TResponse](RequestData requestData)
   at Elasticsearch.Net.Transport`1.Request[TResponse](HttpMethod method, String path, PostData data, IRequestParameters requestParameters)
# Exception:
Elasticsearch.Net.UnexpectedElasticsearchClientException: The stream does not support concurrent IO read or write operations. ---> System.NotSupportedException: The stream does not support concurrent IO read or write operations.
   at System.Net.ConnectStream.InternalWrite(Boolean async, Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
   at System.Net.ConnectStream.BeginWrite(Byte[] buffer, Int32 offset, Int32 size, AsyncCallback callback, Object state)
   at System.Net.Http.StreamToStreamCopy.TryStartWriteSync(Int32 bytesRead)
   at System.Net.Http.StreamToStreamCopy.StartRead()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Elasticsearch.Net.HttpConnection.Request[TResponse](RequestData requestData)
   at Elasticsearch.Net.RequestPipeline.CallElasticsearch[TResponse](RequestData requestData)
   at Elasticsearch.Net.Transport`1.Request[TResponse](HttpMethod method, String path, PostData data, IRequestParameters requestParameters)
   --- End of inner exception stack trace ---
   at Elasticsearch.Net.Transport`1.Request[TResponse](HttpMethod method, String path, PostData data, IRequestParameters requestParameters)
   at Nest.ElasticClient.Bulk(IBulkRequest request)

I have this warning elasticsearch.log, but I don't know if it is related to the issue above :

[2024-03-04T13:06:01,583][WARN ][o.e.h.AbstractHttpServerTransport] [APT02-7FHMQ93] caught exception while handling client http traffic, closing connection Netty4HttpChannel{localAddress=/127.0.0.1:9200, remoteAddress=/127.0.0.1:57403}
io.netty.handler.codec.PrematureChannelClosureException: Channel closed while still aggregating message
	at io.netty.handler.codec.MessageAggregator.channelInactive(MessageAggregator.java:436) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[?:?]
	at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:235) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[?:?]
	at org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.channelInactive(Netty4HttpHeaderValidator.java:186) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411) ~[?:?]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[?:?]
	at org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler.channelInactive(Netty4WriteThrottlingHandler.java:109) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[?:?]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[?:?]
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[?:?]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[?:?]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[?:?]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[?:?]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[?:?]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[?:?]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[?:?]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[?:?]
	at java.lang.Thread.run(Thread.java:1583) ~[?:?]
[2024-03-04T13:06:08,730][INFO ][o.e.i.b.HierarchyCircuitBreakerService] [APT02-7FHMQ93] attempting to trigger G1GC due to high heap usage [1043445192]
[2024-03-04T13:06:08,742][INFO ][o.e.i.b.HierarchyCircuitBreakerService] [APT02-7FHMQ93] GC did bring memory usage down, before [1043445192], after [685622688], allocations [6], duration [11]

Any idea to solve this issue would be appreciated :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.