Spark push-down query enhancement


(Aokolnychyi) #1

Hi,

I am creating a data frame by specifying a filter alongside my index and type name. Everything works fine unless I am trying to additionally filter the resulting data frame on Spark side. According to the documentation, "Note the push down operations apply even when one specifies a query - the connector will enhance it according to the specified SQL". As I understand, my filter should be enhanced but not ignored. However, it seems that my initial query is ignored if the push-down is enabled.

I have the following use case.

Mapping:

"mappings": {
  "locationEvent": {
    "properties": {
      "blockId": {
        "type": "string",
        "index": "not_analyzed"
      },
      "deviation": {
        "type": "long"
      },
      "distance": {
        "type": "long"
      },
      "doorsState": {
        "type": "string",
        "index": "not_analyzed"
      },
      "lineId": {
        "type": "string",
        "index": "not_analyzed"
      },
      "routeId": {
        "type": "string",
        "index": "not_analyzed"
      },
      "stopPointId": {
        "type": "string",
        "index": "not_analyzed"
      },
      "time": {
        "type": "date",
        "format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
      },
      "tripNumber": {
        "type": "string",
        "index": "not_analyzed"
      }
    }
  }
}

Initial filter (later as validEventsQuery):

{
  "filter": {
    "bool": {
      "must": [
        {
          "range": {
            "time": {
              "gte": "2016-08-30T00:00:00.000Z",
              "lt": "2016-08-31T02:00:00.000Z"
            }
          }
        },
        {
          "or": [
            {
              "not": {
                "term": {
                  "distance": "0"
                }
              }
            },
            {
              "and": [
            {
                  "term": {
                    "distance": "0"
                  }
                },
                {
                  "term": {
                    "doorsState": "open"
                  }
                }
              ]
            }
          ]
        },
        {
          "exists": {
            "field": "deviation"
          }
        },
        {
          "exists": {
            "field": "distance"
          }
        },
        {
          "exists": {
            "field": "lineId"
          }
        },
        {
          "exists": {
            "field": "tripNumber"
          }
        },
        {
          "exists": {
            "field": "routeId"
          }
        },
        {
          "exists": {
            "field": "doorsState"
          }
        },
        {
          "exists": {
            "field": "time"
          }
        },
        {
          "exists": {
            "field": "stopPointId"
          }
        }
      ]
    }
  }
}
  1. Default config, no filtering on Spark side.
    val eventDF = hiveContext.esDF(s"$index/$type", validEventsQuery).count()
    The result is 480365. ES REST API gives the same number.
  2. Default config, additional filtering on Spark side.
    val eventDF = hiveContext.esDF(s"$index/$type", validEventsQuery).filter('stopPointId !== "").count()
    The result is 715467. ES REST API returns this number only for the filter stopPointId !== "".

If I disable the push-down, then everything works as expected. I also tried to set "strict" to true and "double.filtering" to true/false but nothing worked for me.

It would be great if anyone can tell what I am doing wrong. Thanks in advance.

Versions:
sparkVersion = '1.6.1'
esVersion = '2.3.2'
esSparkConnectorVersion = '2.3.3'


(Aokolnychyi) #2

The corresponding enhanced query in the slow search log:

    {
      "query": {
        "filtered": {
          "filter": {
            "bool": {
              "must": [
                    {
                  "range": {
                    "time": {
                      "gte": "2016-08-30T00:00:00.000Z",
                      "lt": "2016-08-31T02:00:00.000Z"
                    }
                  }
                },
                {
                  "or": [
                    {
                      "not": {
                        "term": {
                          "distance": "0"
                        }
                      }
                    },
                    {
                      "and": [
                        {
                          "term": {
                            "distance": "0"
                          }
                        },
                        {
                          "term": {
                            "doorsState": "open"
                          }
                        }
                      ]
                    }
                  ]
                },
                {
                  "exists": {
                    "field": "deviation"
                  }
                },
                {
                  "exists": {
                    "field": "distance"
                  }
                },
                {
                  "exists": {
                   "field": "lineId"
                  }
                },
                {
                  "exists": {
                    "field": "tripNumber"
                  }
                },
                {
                  "exists": {
                    "field": "routeId"
                  }
                },
                {
                  "exists": {
                    "field": "doorsState"
                  }
                },
                {
                  "exists": {
                    "field": "time"
                  }
                },
                {
                  "exists": {
                    "field": "stopPointId"
                  }
                }
              ]
            }
          },
          "filter": {
            "and": [
              {
                "not": {
                  "filter": {
                    "query": {
                      "match": {
                        "stopPointId": ""
                      }
                    }
                  }
                }
              }
            ]
          }
        }
      }
    }

(James Baiera) #3

It looks like your provided QueryDSL is missing the top level "query" attribute, and so the generated query has two entries for "filter" underneath the "filtered" query that marries the pushdown with the provided query.

If you wrap your initially provided Query DSL in a {"query":{"filtered":{ ... }}} block, it should correctly construct the pushdown filter.


(Aokolnychyi) #4

Thanks a lot for your reply, I appreciate.
You are right, it worked.


(system) #5