Aggregation running very slow

I'm using sparkSQL to query ES but aggregation (GROUP BY) is running very slow. I can't remember but seems I read that aggregation queries are also push down to ES. If that's the case, why is aggregation running so slow and how can I improve that ?

When in doubt, refer to the ES-Hadoop reference documentation. Spark performs group by aggregation by itself and does not expose a filter to be pushed down.
In other words there's nothing the connector can do for this operation.

@costin
I now understand the mechanism of how ES-Hadoop work, Thanks!

If I may ask, how can I get all my data indexed in Elasticsearch to Spark ?

I'm not sure if there is a better way than using spark sql to do a select * from BigIndex

That is as well explained in the docs - in fact Spark is the longest chapter in the reference documentation.

1 Like

@costin Can you please elaborate more on how aggregations are handled for the ES-SparkSQL integration. (Sorry I couldn't find it in the docs). For example what will happen if I will issue a following query:

SELECT termA, SUM(measure) from index
WHERE termB = 'XXX' and termC > 5
GROUP BY termA

As I understand it should happen as:

  1. ES issues query SELECT term, measure from index WHERE termB = 'XXX' and termC > 5 on every shard (using collocation where possible)
  2. This data is loaded into Spark on each node and there it's aggregated.

By this we benefit from ES index - we quickly get only filtered data and also spark computation for the GROUP BY operation.

Do I understand it correctly - can you please correct me if necessary.

Thanks,
Sergey.

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-pushdown

P.S. In the future, please open a new thread instead of hijacking an existing one. Thanks!

@costin Thank you for the prompt response, I saw this documentation, but it doesn't say whether aggregations are supported by push-down or not (I assume that not). That's why I asked for the steps of GROUP BY query.

Thanks,
Sergey.

Point taken. I'll update the docs to make this clear.
Note that as I've indicated on this thread - the issue is that without hooks in Spark, aggregations-like operations are not possible since the storage (in this case Elastic) is unaware of what's going on.
Take for example - count. On an RDD level, the connector supports this operation and instead of iterating manually through the entire content, returns the count right away.
However on a DataFrame the API doesn't allow count to be pushed down so one will actually end up iterating over the entire data. This is true for aggregations, including group-by.

The pushdown however applies at query level, in particular SQL where the WHERE and SELECT are properly exposed and thus picked up by ES-Hadoop.

Maybe in the future, more and more operations will be exposed by Spark.