Slow performance of Elasticsearch-Hadoop + Spark SQL

Hi Costin,

Thank you for quick reply and detail explanation.

I understand that working with Elasticsearch via Spark SQL cannot be free and it makes sense that the performance would be slower by some percentage comparing to CURL, but the actual performance is many times slower.
I turned on logging to see the REST communication when processing a query submitted through Spark SQL and saw that for a query with "where" conditions the filters are not pushed down to Elasticsearch.
Below lines show the query which was executed and the traces with "match_all" REST queries from the log file for that Spark SQL query which show that no filtering is done in the REST requests.
Log file also displays the documents which were hit by the REST queries and many of the documents do not meet the conditions of the SQL query.

select EventCount, Hour
from intervals
where User = 'Robert Greene'
and DataStore = 'PROD_HK_HR'
and EventAffectedCount = 56;

2015-06-03 11:24:54 CommonsHttpTransport [TRACE] Tx [POST]@[192.168.226.133:9200][s@s_summary/intervals/_search?search_type=scan&scroll=5&size=5000&preference=_shards:4;_only_node:A_macmMWT7u2ZnGqCyFiKA] w/ payload [{"query":{"match_all":{}}}]
2015-06-03 11:24:54 CommonsHttpTransport [TRACE] Tx [POST]@[192.168.226.133:9200][s@s_summary/intervals/_search?search_type=scan&scroll=5&size=5000&preference=_shards:3;_only_node:A_macmMWT7u2ZnGqCyFiKA] w/ payload [{"query":{"match_all":{}}}]
2015-06-03 11:24:54 CommonsHttpTransport [TRACE] Tx [POST]@[192.168.226.133:9200][s@s_summary/intervals/_search?search_type=scan&scroll=5&size=5000&preference=_shards:0;_only_node:A_macmMWT7u2ZnGqCyFiKA] w/ payload [{"query":{"match_all":{}}}]
2015-06-03 11:24:54 CommonsHttpTransport [TRACE] Tx [POST]@[192.168.226.133:9200][s@s_summary/intervals/_search?search_type=scan&scroll=5&size=5000&preference=_shards:1;_only_node:A_macmMWT7u2ZnGqCyFiKA] w/ payload [{"query":{"match_all":{}}}]
2015-06-03 11:24:54 CommonsHttpTransport [TRACE] Tx [POST]@[192.168.226.133:9200][s@s_summary/intervals/_search?search_type=scan&scroll=5&size=5000&preference=_shards:2;_only_node:A_macmMWT7u2ZnGqCyFiKA] w/ payload [{"query":{"match_all":{}}}]

Elasticsearch-hadoop documentation says that when Elasticsearch source is defined using esDF function with a query than the filter is pushed down to ELasticsearch.
I tried this approach using the definition below and compared performance of the same SQL query and it completed in less than a second.

val intv = sqlContext.esDF("s@s_summary/intervals", "?q="Robert Greene"")

From this experimentations it seems that the main reason for the slow performance in Spark SQL is that filters are not pushed down to Elasticsearch (unless using esDF function with a fixed query) and es-hadoop brings whole the data into Spark and applies filtering there.
Can you please confirm that?
Are there any ways to improve the performance in Spark SQL given this behaviour besides adding more hardware?

Thanks,

Dmitriy Fingerman

On Monday, June 1, 2015 at 5:40:32 PM UTC-4, Costin Leau wrote:

The best way is to use a profiler to understand where time is spent.
Spark while it is significantly faster than Hadoop, cannot compete with CULR.
The latter is a simple REST connection - the former triggers a JVM, Scala, akka, Spark,
which triggers es-hadoop which does the parallel call against all the nodes, retries the data in JSON format,
converts it into Scala/Java and applies on schema on top for Spark SQL to run with.

If you turn on logging, you'll see in fact there are multiple REST/CURL calls done by es-hadoop.
With a JVM/Scala warmed up, you should see less than 15s however it depends on how much hardware you have available.
Note that the curl comparison is not really fair - adding a SQL layer on top of that is bound to cost you something.


On 6/1/15 8:47 PM, Dmitriy Fingerman wrote:
> Hi,
>
> I see a big difference in performance of the same query expressed via Spark SQL and CURL.
> In CURL the query runs less then a second, and in Spark SQL it runs 15 seconds.
> The index/type which I am querying contains 1M documents.
> Can you please explain why there is so big difference in performance?
> Are there any ways to tune performance of Elasticsearch + Spark SQL?
>
> Environment: (everything is running on the same box):
>      Elasticsearch 1.4.4
>      elasticsearch-hadoop 2.1.0.BUILD-SNAPSHOT
>      Spark 1.3.0.
>
> CURL:
>
> curl -XPOST "http://localhost:9200/summary/intervals/_search" -d'
> {
>      "query" : {
>          "filtered" : {
>              "query" : { "match_all" : {}},
>               "filter" : {
>                  "bool" : {
>                      "must" : [
>                          {
>                              "term" : { "User" : "Robert Greene" }
>                          },
>                          {
>                              "term" : { "DataStore" : "PROD_HK_HR" }
>                          },
>                          {
>                              "term" : { "EventAffectedCount" : 56 }
>                          }
>                      ]
>                  }
>              }
>          }
>      }
> }'
>
> Spark:
>
>      val sparkConf = new SparkConf().setAppName("Test1")
>
>      // increasing scroll size to 5000 from the default 50 improved performance by 2.5 times
>      sparkConf.set("es.scroll.size", "5000")
>
>      val sc =  new SparkContext(sparkConf)
>      val sqlContext = new SQLContext(sc)
>
>      val intv = sqlContext.esDF("summary/intervals")
>      intv.registerTempTable("INTERVALS")
>
>      val intv2 = sqlContext.sql("select EventCount, Hour      " +
>                                        "from intervals               " +
>                                        "where User = 'Robert Greene' " +
>                                        "and DataStore = 'PROD_HK_HR' " +
>                                        "and EventAffectedCount = 56  ")
>      intv2.show(1000)
>
> --

The final type of projection supported by Spark (filtering) is done on a separate branch and after cleaning up the current crop of bugs, it will be pushed to master. The existing query should already be applied - if that's not the case it's a bug (which I'll investigate anyway).
The filter that I'm talking about is modifying the user query based on the query created by Spark SQL itself.

And please do add some formatting in your posts - is really easy and makes a big difference.

@dmitriyf By the way, the full push down functionality is now available in master and it will be available in the next dev build (probably in the next couple of hours).

Hi Costin,

Thanks for update.
I tried the most recent elasticsearch-hadoop-2.1.0.BUILD-SNAPSHOT.jar, but I don't see an improvement in performance.

The dependency that I used for build is:

<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch-hadoop</artifactId>
	<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>

Is the push-down functionality present there? If not, what dependency should I use to get the new functionality?

The functionality is there and it should be obvious when turning on logging and running the Spark SQL query and inspecting the resulting query DSL.

I tried again and found that with the new es-hadoop 2.1.0.rc1 the pushdown is working and I see huge improvement in queries performance (from 20 seconds to less than a second). I just want to mention, that it did work only for tables in Spark SQL context defined through sqlContext.load(...) function and for those defined using esDF(...) methods. When using esDF method, queries' performance was same as it was without pushdown filtering.

Also, I tried to follow the example from es-hadoop documentation for using non-default settings for pushdown filtering, and with this code I got compilation errors:

val sql = new SQLContext...
val options = Map("pushdown" -> "true", "nodes" -> "someNode", "esPort" -> "9200")

// Spark 1.3 style
val spark13DF = sql.load("spark/index", "org.elasticsearch.spark.sql", options)

.

error: overloaded method value load with alternatives:
[ERROR]   (source: String,schema: org.apache.spark.sql.types.StructType,options: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame <and>
[ERROR]   (source: String,schema: org.apache.spark.sql.types.StructType,options: java.util.Map[String,String])org.apache.spark.sql.DataFrame
[ERROR]  cannot be applied to (String, String, scala.collection.immutable.Map[String,String])
[ERROR]     val hours = sqlContext.load("s@s_summary/hours", "org.elasticsearch.spark.sql", options)

That is expected. As both the es-hadoop docs mention as well as the Spark ones, the pushdown is available only for Spark sources ; normal DataFrames do not fall under this category and thus, will not get the proper filters applied to them.
Maybe in the future, the Spark SQL will extend this functionality but as it stands right now, the best way to work with DataFrames is through the sql.load and sql.read methods.

Indeed that's a bug, the correct snippet should be:

val sql = new SQLContext...
val options = Map("pushdown" -> "true", "nodes" -> "someNode", "esPort" -> "9200", "path" -> "spark/index")

// Spark 1.3 style
val spark13DF = sql.load("org.elasticsearch.spark.sql", options)

I'll update the documentation shortly.

1 Like

@dmitriyf As a side note, I've responded to the issues your raised, in particular this one and I'd like to know whether the problem still occurs for you or not. It would be great to review this and potentially have it addressed before GA.
Can you please retry the example again using the rc1?

Thanks,

Hi Costin,

This issue doesn't occur to me anymore with build-snapshot and with rc1.
Definition of ES Data Sources with options is also working. The "path" and "pushdown" properties are working.
The only thing that remains unclear to me is the "strict" property. I have Not-Analyzed fields and with "strict" property set to false I was expecting pushdown not to work, but it did work regardless of the strict property (I tried it with both flase and true). I tried also "pushdown.strict" and "es.internal.spark.sql.pushdown.strict" but didn't see any differences.

Thanks

The difference is in whether the pushdown filters will be based on filters (like term or terms which have exact matches) or queries.
If your fields are not-analyzed this makes no difference however if the fields are analyzed, things are completely different.
Take for example airport symbols - if your doc contains "SFO" and the field is analyzed, you won't find the field when searching for strict matches.

P.S. There's no pushdown.strict or es.internal. - simply use strict.