Hi,
I am creating a data frame by specifying a filter alongside my index and type name. Everything works fine unless I am trying to additionally filter the resulting data frame on Spark side. According to the documentation, "Note the push down operations apply even when one specifies a query - the connector will enhance it according to the specified SQL". As I understand, my filter should be enhanced but not ignored. However, it seems that my initial query is ignored if the push-down is enabled.
I have the following use case.
Mapping:
"mappings": {
"locationEvent": {
"properties": {
"blockId": {
"type": "string",
"index": "not_analyzed"
},
"deviation": {
"type": "long"
},
"distance": {
"type": "long"
},
"doorsState": {
"type": "string",
"index": "not_analyzed"
},
"lineId": {
"type": "string",
"index": "not_analyzed"
},
"routeId": {
"type": "string",
"index": "not_analyzed"
},
"stopPointId": {
"type": "string",
"index": "not_analyzed"
},
"time": {
"type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
},
"tripNumber": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
Initial filter (later as validEventsQuery):
{
"filter": {
"bool": {
"must": [
{
"range": {
"time": {
"gte": "2016-08-30T00:00:00.000Z",
"lt": "2016-08-31T02:00:00.000Z"
}
}
},
{
"or": [
{
"not": {
"term": {
"distance": "0"
}
}
},
{
"and": [
{
"term": {
"distance": "0"
}
},
{
"term": {
"doorsState": "open"
}
}
]
}
]
},
{
"exists": {
"field": "deviation"
}
},
{
"exists": {
"field": "distance"
}
},
{
"exists": {
"field": "lineId"
}
},
{
"exists": {
"field": "tripNumber"
}
},
{
"exists": {
"field": "routeId"
}
},
{
"exists": {
"field": "doorsState"
}
},
{
"exists": {
"field": "time"
}
},
{
"exists": {
"field": "stopPointId"
}
}
]
}
}
}
- Default config, no filtering on Spark side.
val eventDF = hiveContext.esDF(s"$index/$type", validEventsQuery).count()
The result is 480365. ES REST API gives the same number. - Default config, additional filtering on Spark side.
val eventDF = hiveContext.esDF(s"$index/$type", validEventsQuery).filter('stopPointId !== "").count()
The result is 715467. ES REST API returns this number only for the filter stopPointId !== "".
If I disable the push-down, then everything works as expected. I also tried to set "strict" to true and "double.filtering" to true/false but nothing worked for me.
It would be great if anyone can tell what I am doing wrong. Thanks in advance.
Versions:
sparkVersion = '1.6.1'
esVersion = '2.3.2'
esSparkConnectorVersion = '2.3.3'