Elasticsearch Spark - Tracing of Statement Scan And Scroll

I have found this question on SO and I'm curious about what do you think about that @costin ?

We are trying to integrate ES (1.7.2, 4 node cluster) with Spark (1.5.1, compiled with hive and hadoop with scala 2.11, 4 node cluster), there is hdfs coming into equation (hadoop 2.7,4 nodes) and thrift jdbc server and elasticsearch-hadoop-2.2.0-m1.jar

Thus, there are two ways of executing statement on ES.

I . Spark SQL with scala

val conf = new  SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g")
conf.set("spark.logConf", "true")
conf.set("spark.cores.max","20")
conf.set("es.index.auto.create", "false")
conf.set("es.batch.size.bytes", "100mb")
conf.set("es.batch.size.entries", "10000")
conf.set("es.scroll.size", "10000")
conf.set("es.nodes", "node2:39200")
conf.set("es.nodes.discovery","true")
conf.set("pushdown", "true")

sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar")
sc.addJar("executorLib/scala-library-2.10.1.jar")

sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')" )

val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'")

II. Thrift server (code executed on spark)

polledDataSource = new ComboPooledDataSource()
polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver")
polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001")
polledDataSource.setMaxPoolSize(5)
dbConnection = polledDataSource.getConnection
dbStatement = dbConnection.createStatement

val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='{\"query\":{\"term\":{\"transmittersID\":\"262021306841042\"}}}','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')")

dbStatement.setFetchSize(50000)
dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6")

I have following issues and due to fact that they are connected, I have decided to pack them into one question on stack:

It seems that method using Spark SQL supports pushdown of what goes behind WHERE (whether es.query is specified or not), time of execution is the same and is acceptable. But solution number 1 definitely does not support pushdow of aggregating functions, i.e. presented count(*) is not executed on side of ES, but only after all data are retrieved - ES returns rows and Spark SQL counts them. Please confirm if this is correct behavior

Solution number one behaves strange in that whether pushdown is passed true or false, time is equal

Solution number 2 seems to support no pushdown, it does not matter in what way I try to specify the sub-query, be it part of the table definition or in WHERE clause of the statement, it seems it is just fetching all the huge index and then to make the maths on it. Is it so that thrift-hive is not able to do pushdown on ES

I'd like to trace queries in elastic search, I do set following:

//logging.yml
index.search.slowlog: TRACE, index_search_slow_log_file
index.indexing.slowlog: TRACE, index_indexing_slow_log_file

additivity:

index.search.slowlog: true
index.indexing.slowlog: true

All index.search.slowlog.threshold.query,index.search.slowlog.threshold.fetch and even index.indexing.slowlog.threshold.index are set to 0ms. And I do see in slowlog file common statements executed from sense (so it works). But I don't see Spark SQL or thrift statements executed against ES. I suppose these are scan&scroll statement because if i execute scan&scroll from sense, these are also not logged. Is it possible somehow to trace scan&scroll on side of ES?


Here is the answer provided :

  1. As far as I know it is an expected behavior. All sources I know behave exactly the same way and intuitively it make sense. SparkSQL is designed for analytical queries and it make more sense to fetch data, cache and process locally. See also http://stackoverflow.com/q/32573991/1560062

  2. I don't think that conf.set("pushdown", "true") has any effect at all. If you want to configure connection specific settings it should be passed as an OPTION map as in the second case. Using es prefix should work as well

  3. This is strange indeed. Martin Senne reported a similar issue with PostgreSQL but I couldn't reproduce that.

There are a number of issues with your setup:

  1. You mention using Scala 2.11 but are using Scala 2.10. Note that if you want to pick your Scala version, elasticsearch-spark should be used, elasticsearch-hadoop provides binaries for Scala 2.10 only.

  2. The pushdown functionality is only available through Spark DataSource. If you are not using this type of declaration, the pushdown is not passed to ES (that's how Spark works). Hence declaring pushdown there is irrelevant.

  3. Notice how all params in ES-Hadoop start with es. - the only exceptions are pushdown and location which are Spark DataSource specific (following Spark conventions as these are Spark specific features in a dedicated DS)

  4. Using a temporary table does count as a DataSource however you need to use pushdown there. If you don't, it gets activated by default hence why you see no difference between your runs; you haven't changed any relevant param.

  5. Count and other aggregations are not pushed down by Spark. I've raised this up actually with the Databricks team some time ago and while there might be something in the future, there isn't anything currently. For count, you can do a quick call by using dataFrame.rdd.esCount. But it's an exceptional case.

  6. I'm not sure whether thrift server actually counts as a DataSource since it loads data from Hive. You can double check this by enabling logging on the org.elasticsearch.hadoop.spark package to DEBUG. You should see whether the SQL does get translated to the DSL.

Hope this helps,

Great answer thanks! Would you mind if I post it also in SO? :slight_smile: