I'm using sparkSQL to query ES but aggregation (GROUP BY) is running very slow. I can't remember but seems I read that aggregation queries are also push down to ES. If that's the case, why is aggregation running so slow and how can I improve that ?
When in doubt, refer to the ES-Hadoop reference documentation. Spark performs group by aggregation by itself and does not expose a filter to be pushed down.
In other words there's nothing the connector can do for this operation.
@costin
I now understand the mechanism of how ES-Hadoop work, Thanks!
If I may ask, how can I get all my data indexed in Elasticsearch to Spark ?
I'm not sure if there is a better way than using spark sql to do a select * from BigIndex
That is as well explained in the docs - in fact Spark is the longest chapter in the reference documentation.
@costin Can you please elaborate more on how aggregations are handled for the ES-SparkSQL integration. (Sorry I couldn't find it in the docs). For example what will happen if I will issue a following query:
SELECT termA, SUM(measure) from index
WHERE termB = 'XXX' and termC > 5
GROUP BY termA
As I understand it should happen as:
- ES issues query
SELECT term, measure from index WHERE termB = 'XXX' and termC > 5
on every shard (using collocation where possible) - This data is loaded into Spark on each node and there it's aggregated.
By this we benefit from ES index - we quickly get only filtered data and also spark computation for the GROUP BY operation.
Do I understand it correctly - can you please correct me if necessary.
Thanks,
Sergey.
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-pushdown
P.S. In the future, please open a new thread instead of hijacking an existing one. Thanks!
@costin Thank you for the prompt response, I saw this documentation, but it doesn't say whether aggregations are supported by push-down or not (I assume that not). That's why I asked for the steps of GROUP BY query.
Thanks,
Sergey.
Point taken. I'll update the docs to make this clear.
Note that as I've indicated on this thread - the issue is that without hooks in Spark, aggregations-like operations are not possible since the storage (in this case Elastic) is unaware of what's going on.
Take for example - count
. On an RDD
level, the connector supports this operation and instead of iterating manually through the entire content, returns the count right away.
However on a DataFrame
the API doesn't allow count
to be pushed down so one will actually end up iterating over the entire data. This is true for aggregations, including group-by.
The pushdown however applies at query level, in particular SQL where the WHERE
and SELECT
are properly exposed and thus picked up by ES-Hadoop.
Maybe in the future, more and more operations will be exposed by Spark.