Java.io.StreamCorruptedException after running bulk indexing for some time

Found this exception after running bulk indexing for some time.
Any ideas on when this happens?

    [2015-07-26 20:44:20,458][WARN ][monitor.jvm              ] [metrics-datastore-3-es-tune-2] [gc][young][58116][4701] duration [1.2s], collections [1]/[2s], total [1.2s]/[1.2h], memory [982.1mb]->[1.4gb]/[3.4gb], all_pools {[young] [167.6mb]->[40.6mb]/[1.4gb]}{[survivor] [81.6mb]->[186.6mb]/[186.6mb]}{[old] [732.9mb]->[1.1gb]/[1.8gb]}
[2015-07-26 20:44:30,322][WARN ][monitor.jvm              ] [metrics-datastore-3-es-tune-2] [gc][young][58125][4702] duration [1.1s], collections [1]/[1.6s], total [1.1s]/[1.2h], memory [1.9gb]->[1.4gb]/[3.4gb], all_pools {[young] [970.6mb]->[6.3mb]/[1.4gb]}{[survivor] [186.6mb]->[186.6mb]/[186.6mb]}{[old] [875mb]->[1.2gb]/[1.8gb]}
[2015-07-26 20:56:44,858][ERROR][marvel.agent.exporter    ] [metrics-datastore-3-es-tune-2] error sending data to [http://0.0.0.0:9200/.marvel-2015.07.26/_bulk]: SocketTimeoutException[Read timed out]
[2015-07-26 20:56:44,888][WARN ][monitor.jvm              ] [metrics-datastore-3-es-tune-2] [gc][young][58144][4705] duration [11.8m], collections [2]/[11.9m], total [11.8m]/[1.4h], memory [2.6gb]->[1.6gb]/[3.4gb], all_pools {[young] [1.3gb]->[32.3mb]/[1.4gb]}{[survivor] [73.6mb]->[0b]/[186.6mb]}{[old] [1.2gb]->[1.6gb]/[1.8gb]}
[2015-07-26 20:56:45,258][WARN ][transport.netty          ] [metrics-datastore-3-es-tune-2] Message read past expected size (request) for requestId=[201542], action [indices:data/write/bulk[s]], readerIndex [325055] vs expected [324553]; resetting
[2015-07-26 20:56:45,269][WARN ][transport.netty          ] [metrics-datastore-3-es-tune-2] exception caught on transport layer [[id: 0xd2310d88, /172.31.43.121:48390 :> /172.31.42.39:9300]], closing connection
java.io.StreamCorruptedException: invalid internal transport message format, got (2f,31,2e,30)
        at org.elasticsearch.transport.netty.SizeHeaderFrameDecoder.decode(SizeHeaderFrameDecoder.java:63)
        at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
        at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
        at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
[2015-07-26 20:56:45,378][WARN ][transport.netty          ] [metrics-datastore-3-es-tune-2] Message read past expected size (request) for requestId=[140106], action [indices:data/write/bulk[s]], readerIndex [494150] vs expected [206610]; resetting
[2015-07-26 20:56:45,463][WARN ][transport.netty          ] [metrics-datastore-3-es-tune-2] Message read past expected size (request) for requestId=[140127], action [indices:data/write/bulk[s]], readerIndex [537280] vs expected [293365]; resetting
[2015-07-26 20:56:54,890][ERROR][marvel.agent.exporter    ] [metrics-datastore-3-es-tune-2] create failure (index:[.marvel-2015.07.26] type: [node_stats]): EsRejectedExecutionException[rejected execution (queue capacity 100) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1@42d0b94]
[2015-07-26 20:58:15,003][ERROR][marvel.agent.exporter    ] [metrics-datastore-3-es-tune-2] error sending data to [http://0.0.0.0:9200/.marvel-2015.07.26/_bulk]: SocketTimeoutException[Read timed out]
[2015-07-26 20:59:25,084][ERROR][marvel.agent.exporter    ] [metrics-datastore-3-es-tune-2] error sending data to [http://0.0.0.0:9200/.marvel-2015.07.26/_bulk]: SocketTimeoutException[Read timed out]
[2015-07-26 20:59:44,888][WARN ][discovery.zen            ] [metrics-datastore-3-es-tune-2] master left (reason = failed to ping, tried [3] times, each with  maximum [1m] timeout), current nodes: {[Fan Boy][jI4l9n_qTXa-NMsJZYwAAg][ip-172-31-34-71.us-west-2.compute.internal][inet[/172.31.34.71:9300]]{client=true, data=false},[metrics-datastore-3-es-tune-2][gIcddoKtTxuJ5xNySKUsiQ][ip-172-31-42-39.us-west-2.compute.internal][inet[ip-172-31-42-39.us-west-2.compute.internal/172.31.42.39:9300]]{max_local_storage_nodes=1},[Hitman][B2tGcvGHS_GRbRjznxwRxQ][ip-172-31-35-196.us-west-2.compute.internal][inet[/172.31.35.196:9300]]{client=true, data=false},[metrics-datastore-1-es-tune-2][X-Mx6vzFRMOypoyLvZFEBw][ip-172-31-37-68.us-west-2.compute.internal][inet[/172.31.37.68:9300]]{max_local_storage_nodes=1},[metrics-datastore-4-es-tune-2][XP8JLUtHRru9eM5val303w][ip-172-31-45-64.us-west-2.compute.internal][inet[/172.31.45.64:9300]]{max_local_storage_nodes=1},[Man Mountain Marko][6eoPM3w5Rt2d5OWtpylC_A][ip-172-31-43-121.us-west-2.compute.internal][inet[/172.31.43.121:9300]]{client=true, data=false},[metrics-datastore-5-es-tune-2][iuVngJ8YQX6DgGJGGw7JNQ][ip-172-31-41-185.us-west-2.compute.internal][inet[/172.31.41.185:9300]]{max_local_storage_nodes=1},}

It's not really my area of expertise but more information about your environment would help understand the issue, are there any specifics to your deployment, in particular elasticsearch version and client language/library?

Looks like your node(s) are overloaded, you're getting threadpool rejections.

Sorry for missing out basic information:

This occurred on a 4 node setup of Elasticsearch 1.7.0. Each node being c3.xlarge on aws. There are 3 NodeClient (1.7.0) instances doing bulk inserts in batches varying between 8k-12k at around 3-4 seconds interval, each document being < 2k in size. These 3 clients are running as bolts in Storm.

There was consistent indexing at the rate of around 3500 documents per second for a period of like 6 hours after which GC issues caused an interruption to the whole momentum and eventually the cluster landed up in this state.

Thanks @warkolm . Based on my previous comment with details on the versions and setup, what cluster sizing do you suggest ? Do you have any references to cluster configurations that I can refer to? I know its subjective to my data but I just wanted to have a reference.

Your bulk sizing is too large, we generally recommend a 5MB payload size, I'd try reducing that to see if it helps.

Beside decreasing bulk request size, you can also try to increase your maximum heap size, reduce maximum segment merge size, and switch to more aggressive GC. Your logs show that bulk indexing continues (and even times out on socket level) while it seems that big segment merging is starting. This can also mean that your node clients do not wait for the responses of the bulk requests to complete before continuing. Which is fatal in the long run, because your node heap garbage collection is going through the roof, effectively cutting down resources so that bulk messages are crashing due to low memory.

Thanks @jprante @warkolm. I'm starting off a test run with these suggestions. Will update you guys with the observations.

The heap size is currently 3750Mb (half of the available RAM). Are you suggesting to move to an instance with larger memory? Will use a segment size of 2gb.

Are you suggesting to block until the bulk import is complete instead of issuing another one in parallel? I believe we also need to find the right number of concurrent clients then.

Btw, if it helps I have the segment and fielddata output if you want to take a look.

So, the test ended with heap space errors preceded by a lot of NodeDisconnectedExcetions, ClusterBlockExceptions, EsRejectedExecutionExceptions and RejectedExecutionExceptions. After a cluster wide restart there were a number of "FileNotFoundExceptions" looking for segment related files.

At first it seemed to work fine with CPU utilization higher than the last test which I attributed to the possibility that GC is happening more often. The test ran for 7 hours consuming data at the rate of 3000 per second and then there was a sudden drop.

Here is the segment info.