Spark sql filter pushdown

I cannot make the filter to be pushdown to ES when reading from a 1 billion docs index.
Here is the code:

val options = Map("pushdown" -> "true", "double.filtering" -> "false")
val es=sqlContext.read.format("es").options(options).load("myIndex/myDocs").filter($"id".equalTo("idvalue"))

I can't see anything log that indicates a pushdown:
16/04/11 13:54:42 DEBUG ScalaEsRowRDD: Using pre-defined reader serializer [org.elasticsearch.spark.sql.ScalaRowValueReader] as default
16/04/11 13:54:42 DEBUG ScalaEsRowRDD: Using pre-defined reader serializer [org.elasticsearch.spark.sql.ScalaRowValueReader] as default
16/04/11 13:54:42 DEBUG ScalaEsRowRDD: Partition reader instance [EsPartition [node=[wZ96KouxSueLH7petpE1Gw/opvaames04|xxxxxxxxxx:9200],shard=1]] assigned to [wZ96KouxSueLH7petpE1Gw]:[9200]
16/04/11 13:54:42 DEBUG ScalaEsRowRDD: Partition reader instance [EsPartition [node=[-Edj1O1LTcK6_cz1XjOEGA/opvaames06|xxxxxxxxxxx:9200],shard=5]] assigned to [-Edj1O1LTcK6_cz1XjOEGA]:[9200]

I have also tried this way, it doesn't return any doc but it's quick
val es= sqlContext.esDF("myIndex/myDocs","?q=idValue")

My conf is Spark 1.6.0 with elasticsearch connector 2.2.0 and elasticsearch 2.1.1

Do you have any advise or method to test simple pushdown ?

I have done some tests and I don't get into this issue while filtering on a flat dataframe structure. My problem is that I am building my documents into ES joining multiple datasets. My structure is like this :
myObject
|----- mySubObject1
|--------------------- field1
|--------------------- field2
|----- mySubObject2
|--------------------- field3

so I need to do :
val test = sqlContext.read.format("es").load("myIndex/myDocs").filter($"mySubOject1.field1".equalTo("value"))

The pushdown doesn't occur...

First off, try using ES-Hadoop 2.3.0 or 2.2.1.
Regarding the pushdown, you can enable logging on the spark package (looks like you did) on TRACE level and see whether something shows up. Try to do a simple test (select A from X where B > O) and you should see the query being translated.
Note that pushdown happens if Spark triggers it. I'm not clear what you mean by building "documents into ES joining".

I have already upgraded the connector in 2.3 without better results. I said I am building my documents joining 2 documents : I build myObject with a join between mySubOject1 and mySubObject2 so that's why my data structure isn't flat.
I will have to flatten my schema which requires an additional processing step in my spark job...

The pushdown applies only to documents that are stored in ES. If your documents are joined (which by the way, is an operation not pushed by Spark) it means they exist in Spark, hence there's nothing really to be pushed down.

Of course my docs are in ES... Anyway the problem is with non flat data structure : sub documents in documents (object.subObject in Spark SQL), the filter isn't pushdown.

1 Like