Size exceed s Integer.MAX_VALUE

@costin
I'm running Spark on yarn and hitting this error every time when I run against a big Elasticsearch index

15/09/30 17:57:21 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches
java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
	at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
	at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
	at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
	at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
	at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
	at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
	at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
	at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
	at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at java.lang.Thread.run(Thread.java:745)

	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	

Someone mentioned on stackoverflow that by increase the number of partitions fixes the problem. Question is how do I increase the number of partition to fix this?

See also https://www.mail-archive.com/user@spark.apache.org/msg32370.html

Thanks,

Currently we don't offer a way to increase the partitions beyond the number of shards. There might be a way to achieve this reliably; see this issue: https://github.com/elastic/elasticsearch-hadoop/issues/528
It's likely though this will be addressed in 2.2.

@costin
Quick question, does it take the number of replica shards into consideration or just the primary shards ?
e.g.
number of primary shards(8) + replica shards (8) = 16 partitions or
just primary shards = 8 partitions

Currently we don't offer a way to increase the partitions beyond the number of shards.

Can we include that in the documentation ? I've been trying to figure out how to set partition but I just found out is the number of shards.

Replicas are copies of data. If both a primary and its replicas are considered one would have duplicate data consumed. Further more it would not help your case since the issue is that a shard (replica or not) is higher than 2GB.

As for the doc improvement sure, sounds like a good idea. Can you please create an issue to not forget about it?

Thanks,

Got you, thanks!

Done - Number of partitions in RDD · Issue #564 · elastic/elasticsearch-hadoop · GitHub

Cheers!