Spark sql pushdown conjunction filters


I'm trying to use elasticsearch as a data source for a spark script. Everything works fine but when I use a conjunctive filter:

data.filter(data("user").equalTo("foo").and(data("start").lt("2015-10-25 01:00:000")).and(data("end").gt("2015-10-25 00:00:000")))

The pushdown only seems to work for the first filter in the conjunction. When I remove both the filter on "start" and "end", the same amount of data is sent to the query node as in the case of the full filter. Is this a known issue and is there a fix? When I do a count aggregation on the result, I get huge differences so the date filters restrict the result a lot.

Furthermore, it would be really convenient if aggregations where also pushed down to the elasticsearch cluster when possible. Is it known when this will be the case?


What is the resulting queryDSL vs what you are expecting? You can find this out by enabling TRACE logging on the org.elasticsearch.spark package.

When I enable trace logging, I see the following two lines which I think are relevant:

15/11/25 08:34:50 DEBUG DataSource: Pushing down filters [EqualTo(user,foo)]
15/11/25 08:34:50 TRACE DataSource: Transformed filters into DSL $filterString

To me, this seems to show that only the first filter is pushed down. Does this maybe have something to do with the date format of the date filters? On further testing I do see that:


does get properly pushed down:

15/11/25 08:43:02 DEBUG DataSource: Pushing down filters [Or(EqualTo(user,foo1),EqualTo(user,foo2))]

When I change the lt filter to a equalTo filter, it does get pushed down. Are lt and gt filters not yet supported for pushdown? Is there a workaround?

I've resolved the issue. It seems that the lt filter cannot be pushed down with a string as argument. Changing the argument to a java.sql.Timestamp allowed the filter to be pushed down.

@joris_renkens Sorry but I don't follow. For each step can you post the sample Spark query and the resulting queryDSL in the logs and what is that you expect?
Also what version of Spark and ES-Hadoop are you using?

It is unclear what works and what doesn't, whether the queryDSL is incomplete (it might be because Spark SQL doesn't pass in all the Filters) or incorrect. In other words, whether the optimization is not what you would expect or if it's a bug.
And the info request above would help with that greatly.


@costin I don't actually get to see the queryDSL, I get the following line:
15/11/25 08:34:50 TRACE DataSource: Transformed filters into DSL $filterString

What I do see is:
15/11/25 08:34:50 DEBUG DataSource: Pushing down filters [EqualTo(user,foo)]
for the query:
.and(data("start").lt("2015-10-25 01:00:000"))
.and(data("end").gt("2015-10-25 00:00:000"))

This tells me that when I pass timestamps as strings, the filters don't get pushed down. However, I get the following log message:
15/11/27 08:14:43 DEBUG DataSource: Pushing down filters [EqualTo(user,foo),LessThan(start,2015-10-25 01:00:00.0),GreaterThan(end,2015-10-25 00:00:00.0)]
for the query:
.and(data("start").lt(new Timestamp(1445727600000)))
.and(data("end").gt(new Timestamp(1445724000000)))
Which tells me that the filters do get pushed down when the timestamp is provided as a Timestamp object.

Is this enough info?


That looks like a bug. What version of ES-Hadoop are you using?

What does your DataFrame look like? What about your ES mapping? Lastly, what version of Spark and ES are you using?

Sorry for the late answer. My versions:
"elasticsearch-spark" % "2.2.0-beta1"
"spark-core" % "1.5.2"

my dataframe has 4 fields: title,recordStart,recordEnd,recordDuration

"recordEnd": {
       "type": "date",
       "format": "dateOptionalTime"
"recordStart": {
	"type": "date",
	"format": "dateOptionalTime"
"recordDuration": {
	"type": "long"
"title": {
	"type": "string",
	"fields": {
		"raw": {"type":"string", "index":"not_analyzed"}

The issue was handling the dateType which requires a strict (aka value) querying not a terms one. This has been solved in ES-Hadoop 2.2-rc1 - can you please try it out?