Understanding "es.input.max.docs.per.partition" configuration

I am using elasticsearch-hadoop 5.6.2 to read from the elasticsearch 5.6.1 indices and aliases with Spark 2.3. The ES indices are configured to have 5 primary and 1 replica shards.

In my case, the RDD returned by JavaEsSpark.esRDD() is heavily imbalanced in terms of the data contained by each partition. Below is an example of the # of documents per RDD partition :

partition number: 0, document count: 0
partition number: 1, document count: 0
partition number: 2, document count: 0
partition number: 3, document count: 355326
partition number: 4, document count: 355518
partition number: 5, document count: 0
partition number: 6, document count: 0
partition number: 7, document count: 0
partition number: 8, document count: 0
partition number: 9, document count: 355436
partition number: 10, document count: 356470
partition number: 11, document count: 0
partition number: 12, document count: 356520
partition number: 13, document count: 0
partition number: 14, document count: 0

Only 5 partitions (which seem to correspond to 5 ES shards) have the data and other partitions are empty.

I am using the default value (100000) for es.input.max.docs.per.partition config. And my understanding for this config was that with the default value, no RDD partition should have more than 100000 documents. So I am wondering if my understanding is not correct for this config or this is a bug?

The creation of empty slices is a known bug in Elasticsearch. Newer versions should have a fix to keep that from occurring. If you are not able to get up to the more recent versions, you should be able to disable the broken slicing by setting es.input.max.docs.per.parition to a very large number like Long.MAX_VALUE, or if you are on a version of ES-Hadoop higher than 6.2.3 that supports disabling by setting es.input.use.sliced.partitions to false.

2 Likes

Additionally, in 7.0.0 the max docs per partition default value will be removed, and the connector will once again default to creating splits based only on shard counts. Slicing can still be done by setting a value to the max docs per partition setting, it's just that it will be left empty and disabled by default.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.