Hi,
In Spark, I am reading data from Elasticsearch. I need to group documents over the value of a specific field in order to map those groups to a function. My mapper is therefore waiting for a list of documents that as one of the fields identical for each document.
Here is an example of what I am trying to optimize.
I have the following documents in an index:
[{
"user" : "kimchy",
"message" : "trying out Elasticsearch"
},
{
"user" : "kimchy",
"message" : "hello world"
},
{
"user" : "michelle",
"message" : "Hi world"
}
I used user
as routing so that all messages of a user are in a same shard.
In spark, I am reading this index and I wan't to partition messages per user (and not per shard only).
My mapper function is something like
def mapper(messages, user):
# If user is none, get it from first message
# 1. compute some data with those messages
# 2. save some data to ES
For now, this is what I am doing in pyspark
:
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("tweet/message")
rdd = df.rdd.groupBy(lambda x: x.user)
rdd.mapPartitions(mapper, preservesPartitioning=True).reduce(reducer)
Is there anything I can do to optimize the partitioning?