Spark: optimize partitioning

Hi,
In Spark, I am reading data from Elasticsearch. I need to group documents over the value of a specific field in order to map those groups to a function. My mapper is therefore waiting for a list of documents that as one of the fields identical for each document.

Here is an example of what I am trying to optimize.

I have the following documents in an index:

[{
    "user" : "kimchy",
    "message" : "trying out Elasticsearch"
},
{
    "user" : "kimchy",
    "message" : "hello world"
},
{
    "user" : "michelle",
    "message" : "Hi world"
}

I used user as routing so that all messages of a user are in a same shard.

In spark, I am reading this index and I wan't to partition messages per user (and not per shard only).

My mapper function is something like

def mapper(messages, user):
    # If user is none, get it from first message
    # 1. compute some data with those messages
    # 2. save some data to ES

For now, this is what I am doing in pyspark:

df = sqlContext.read.format("org.elasticsearch.spark.sql").load("tweet/message")
rdd = df.rdd.groupBy(lambda x: x.user)
rdd.mapPartitions(mapper, preservesPartitioning=True).reduce(reducer)

Is there anything I can do to optimize the partitioning?

As it stands we do not support defining partitions by a field value in the connector. It's unlikely that this will be supported going forward as it's a very niche feature. From what you are describing though, it seems that this is a decent approach for making sure that all user information is available per partition. Adding parent-child, nested or join fields will probably make the performance worse in terms of querying the data, so I'd advise against that.

Thank you for your answer.