Spark: optimize partitioning

Hi,
In Spark, I am reading data from Elasticsearch. I need to group documents over the value of a specific field in order to map those groups to a function. My mapper is therefore waiting for a list of documents that as one of the fields identical for each document.

Here is an example of what I am trying to optimize.

I have the following documents in an index:

[{
    "user" : "kimchy",
    "message" : "trying out Elasticsearch"
},
{
    "user" : "kimchy",
    "message" : "hello world"
},
{
    "user" : "michelle",
    "message" : "Hi world"
}

I used user as routing so that all messages of a user are in a same shard.

In spark, I am reading this index and I wan't to partition messages per user (and not per shard only).

My mapper function is something like

def mapper(messages, user):
    # If user is none, get it from first message
    # 1. compute some data with those messages
    # 2. save some data to ES

For now, this is what I am doing in pyspark:

df = sqlContext.read.format("org.elasticsearch.spark.sql").load("tweet/message")
rdd = df.rdd.groupBy(lambda x: x.user)
rdd.mapPartitions(mapper, preservesPartitioning=True).reduce(reducer)

Is there anything I can do to optimize the partitioning?

As it stands we do not support defining partitions by a field value in the connector. It's unlikely that this will be supported going forward as it's a very niche feature. From what you are describing though, it seems that this is a decent approach for making sure that all user information is available per partition. Adding parent-child, nested or join fields will probably make the performance worse in terms of querying the data, so I'd advise against that.

Thank you for your answer.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.