Elastic search SaveJsontoEs Hadoop Libra dropping documents without throwing error or warning

Hi, we are using Elasticsearch Hadoop library to index the documents from Spark job. We are facing one weird issue where the documents are not getting indexed without throwing any error or exception.
Method used: rdd.SaveJSonToES.

We have set the batch size as 5000 and let's say if we try to index 15K documents then around 1500-2000 documents doesn't get indexed. We are using ES version 6.2.3. any leads would be highly appreciated!

That is definitely surprising. You checked all spark logs (the driver and all task attempts) and the Elasticsearch logs on all Elasticsearch nodes? Are you using your own row key or letting Elasticsearch generate it for you? If you try to insert one of the failed json documents by itself, what happens?

I need to check the Spark logs. We are using our own key to insert the document. If we try of insert the failed json documents it passes sometimes and sometimes it fails again. Main thing we noticed is number of documents getting dropped goes really high in peak hours of traffic.

any leads on this? No logs from Spark side which indicate this as a problem

I don't have access to a 6.2 cluster right now so I'm not 100% how it behaves. But I would expect to see something in your spark logs (one of the executors or the driver), or maybe in your elasticsearch logs. But given that you said

Main thing we noticed is number of documents getting dropped goes really high in peak hours of traffic.

that leads me to guess that your cluster is under stress and you're probably getting rejections because the write queues are growing too large. How are you throttling writes to elasticsearch from spark? It's pretty easy to overwhelm an elasticsearch cluster with writes from a spark cluster. es-spark does not do any throttling at all, and leaves that to the user. See [FEATURE] es.batch.write.wait - time to wait between each bulk · Issue #1400 · elastic/elasticsearch-hadoop · GitHub for example.

we don't do any client level throttling as of now. But from Spark side or Elastic search side we are not getting any throttling errors otherwise it would have been easy to track what has been failing.
All we are doing right now is passing the configuration "es.batch.size.entries" with value as 5000.

You probably need to do some throttling.

We have set the batch size as 5000

Have you tried lowering that?

Do you have a way we can reproduce the problem? I don't remember ever seeing silent failures without some message in a log somewhere. What exactly is the code that you're executing? And how do you know that there are documents that went into saveJsonToEs but are not making it into Elasticsearch? And you haven't mentioned Elasticsearch logs yet -- is there anything interesting in there at the time?

Right now there is no straight forward way to reproduce this. Even I am trying to repro this somehow in local environment but unable to do so.
In the code, we have two RDD's (one for metadata and other for text document) basically a parent child relationship. In the first call to rdd.SaveJsonToES we try to insert the metadata and in second call rdd.SaveJsonToES we pass the configuration, "es.mapping.routing" and map to ID of parent document. So in one job, if we have 5000 documents then 5000 meta data will be inserted and then another call to insert the 5000 child document is made.
We have tried reducing the batchSize to 2000 but issue still persists. We have also checked if the document size is a problem but we found that there are documents where the content size is almost zero are getting dropped.
In the ES logs, I can only see this transport layer exception:
[2023-01-14T09:16:43,967][WARN ][o.e.t.n.Netty4Transport ] [es-cc-nam01-c006-master-vm0] exception caught on transport layer [NettyTcpChannel{localAddress=/10.6.10.10:9300, remoteAddress=/10.6.10.10:42036}], closing connection
io.netty.handler.codec.DecoderException: java.io.StreamCorruptedException: invalid internal transport message format, got (16,3,0,0)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459) ~[netty-codec-4.1.16.Final.jar:4.1.16.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392) ~[netty-codec-4.1.16.Final.jar:4.1.16.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:359) ~[netty-codec-4.1.16.Final.jar:4.1.16.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342) ~[netty-codec-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:167) [netty-handler-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1354) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:917) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.16.Final.jar:4.1.16.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) [netty-common-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) [netty-common-4.1.16.Final.jar:4.1.16.Final]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
Caused by: java.io.StreamCorruptedException: invalid internal transport message format, got (16,3,0,0)
at org.elasticsearch.transport.TcpTransport.validateMessageHeader(TcpTransport.java:1283) ~[elasticsearch-6.2.3.jar:6.2.3]
at org.elasticsearch.transport.netty4.Netty4SizeHeaderFrameDecoder.decode(Netty4SizeHeaderFrameDecoder.java:36) ~[?:?]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[?:?]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[?:?]

That exception looks like java.io.StreamCorruptedException: invalid internal transport message format, got (16,3,3,0). I don't know if it is relevant to your spark problem or not, but it seems worth fixing. It would also be good to upgrade to a supported version if at all possible.