How search es using spark-sql with 'function match' or others

Issue description

now,I want select es using 'spark-sql',but 'predicate pushdown' does not take effect
Description

Steps to reproduce

Code:
first I create a index

curl -H "Content-Type: application/json" -XPOST  'http://localhost:9200/myindex/_mapping?pretty=true' -d '
{
  "properties": {
        "author" : {
          "type" : "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_max_word",
          "fields": {
            "raw": { 
              "type":  "keyword"
            }
        }

        },
        "name" : {
          "type" : "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_max_word",
          "fields": {
            "raw": { 
              "type":  "keyword"
            }
        }
        },
         "uuid" : {
          "type" : "keyword"
        } 
    }                   
}' 

and create a global_temp table in spark

val conf = new SparkConf().setMaster("local").setAppName("sss").set("es.nodes", "countly-new002").set("es.port", "9200").set("pushdown", "true")
val spark = SparkSession.builder().master("local[*]").appName("spark_es").config(conf).enableHiveSupport().getOrCreate()
var df = spark.read.format("org.elasticsearch.spark.sql").options(Map("es.nodes" -> "countly-new002", "es.port" -> "9200", "es.resource" -> "myindex", "es.index.auto.create" -> "false", "pushdown" -> "true", "strict" -> "false")).load();
df.createOrReplaceGlobalTempView("myindex")
HiveThriftServer2.startWithContext(spark.sqlContext)

when I search data from ES using spark-sql:

select * from global_temp.myindex where  name = '慕容公子' limit 10;
+---------+-------+-------+--+
| author  | name  | uuid  |
+---------+-------+-------+--+
+---------+-------+-------+--+
explain select * from global_temp.myindex where  name = '慕容公子' limit 10;
| == Physical Plan ==
CollectLimit 10
+- *(1) Filter (isnotnull(name#1) && (name#1 = 慕容公子))
   +- *(1) Scan ElasticsearchRelation(Map(es.internal.spark.sql.pushdown -> true, es.index.auto.create -> false, es.port -> 9200, es.resource -> myindex, es.nodes -> countly-new002, es.internal.spark.sql.pushdown.strict -> false),org.apache.spark.sql.SQLContext@2fc0ad21,None) [author#0,name#1,uuid#2] PushedFilters: [IsNotNull(name), EqualTo(name,慕容公子)], ReadSchema: struct<author:string,name:string,uuid:string>  

the PushedFilters are '[IsNotNull(name), EqualTo(name,慕容公子)]'
and 'es.internal.spark.sql.pushdown.strict' is false.

SQL syntax ES 1.x/2.x syntax ES 5.x syntax
= null , is_null missing must_not.exists
= (strict) term term
= (not strict) match match
> , < , >= , ⇐ range range
is_not_null exists exists
in (strict) terms terms
in (not strict) or.filters bool.should
and and.filters bool.filter
or or.filters bool.should [bool.filter]
not not.filter bool.must_not
StringStartsWith wildcard(arg*) wildcard(arg*)
StringEndsWith wildcard(*arg) wildcard(*arg)
StringContains wildcard(arg) wildcard(arg)
EqualNullSafe (strict) term term
EqualNullSafe (not strict) match match

that should execute 'match' in es , but not.
when I execute 'match' with eql is

select * from myindex where  match(name,'慕容公子') limit 10;
author|name|uuid                
----|----|----
金庸| 纷纷猜测是慕容公子与小姐外出踏青了。|1c5249c4-91a1-11ea-ac36-00163e014c29

using spark-sql

select * from global_temp.myindex where  match(name,'慕容公子') limit 10;
Error: org.apache.spark.sql.AnalysisException: Undefined function: 'match'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 41 (state=,code=0)

the log file of debug is

2020-05-12 18:23:34 DEBUG HttpConnection:692 - Open connection to 10.160.15.251:9200
2020-05-12 18:23:34 DEBUG header:70 - >> "POST /library/_search?sort=_doc&scroll=5m&size=2&preference=_shards%3A0%7C_local&track_total_hits=true HTTP/1.1[\r][\n]"
2020-05-12 18:23:34 DEBUG HttpMethodBase:1352 - Adding Host request header
2020-05-12 18:23:34 DEBUG header:70 - >> "Content-Type: application/json[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Accept: application/json[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "User-Agent: Jakarta Commons-HttpClient/3.1[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Host: 10.160.15.251:9200[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Content-Length: 168[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "[\r][\n]"
2020-05-12 18:23:34 DEBUG content:84 - >> "{"query":{"bool":{"must":[{"match_all":{}}],"filter":[{"exists":{"field":"name"}},{"match":{"name":"Wakes"}}]}},"_source":["author","name","page_count","release_date"]}"
2020-05-12 18:23:34 DEBUG EntityEnclosingMethod:508 - Request body sent
2020-05-12 18:23:34 DEBUG header:70 - << "HTTP/1.1 200 OK[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "HTTP/1.1 200 OK[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "content-type: application/json; charset=UTF-8[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "content-length: 566[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "[\r][\n]"
2020-05-12 18:23:34 DEBUG content:84 - << "{"_scroll_id":"DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAPcWLTZ0SktUdXRSWkNvYUVONkNsNVBqZw==","took":1,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3,"relation":"eq"},"max_score":null,"hits":[{"_index":"library","_type":"book","_id":"Leviathan Wakes","_score":null,"_source":{"release_date":"2011-06-02","author":"James S.A. Corey","name":"Leviathan Wakes","page_count":561},"sort":[0]},{"_index":"library","_type":"book","_id":"FidoCHIBfHIo0bf8rilh","_score":null,"_source":{"author":"a","name":"Wakes 1"},"sort":[3]}]}}"
2020-05-12 18:23:34 DEBUG HttpMethodBase:1024 - Resorting to protocol version default close connection policy
2020-05-12 18:23:34 DEBUG HttpMethodBase:1028 - Should NOT close connection, using HTTP/1.1
2020-05-12 18:23:34 DEBUG HttpConnection:1178 - Releasing connection back to connection manager.
2020-05-12 18:23:34 DEBUG HeaderProcessor:174 - Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
2020-05-12 18:23:34 DEBUG CommonsHttpTransport:642 - Using regular user provider to wrap rest request
2020-05-12 18:23:34 DEBUG UserGroupInformation:1722 - PrivilegedAction as:admin (auth:SIMPLE) from:org.elasticsearch.hadoop.mr.security.HadoopUser.doAs(HadoopUser.java:66)
2020-05-12 18:23:34 DEBUG header:70 - >> "POST /_search/scroll?scroll=5m HTTP/1.1[\r][\n]"
2020-05-12 18:23:34 DEBUG HttpMethodBase:1352 - Adding Host request header
2020-05-12 18:23:34 DEBUG header:70 - >> "Content-Type: application/json[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Accept: application/json[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "User-Agent: Jakarta Commons-HttpClient/3.1[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Host: 10.160.15.251:9200[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Content-Length: 80[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "[\r][\n]"
2020-05-12 18:23:34 DEBUG content:84 - >> "{"scroll_id":"DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAPcWLTZ0SktUdXRSWkNvYUVONkNsNVBqZw=="}"
2020-05-12 18:23:34 DEBUG EntityEnclosingMethod:508 - Request body sent
2020-05-12 18:23:34 DEBUG header:70 - << "HTTP/1.1 200 OK[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "HTTP/1.1 200 OK[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "content-type: application/json; charset=UTF-8[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "content-length: 396[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "[\r][\n]"
2020-05-12 18:23:34 DEBUG content:84 - << "{"_scroll_id":"DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAPcWLTZ0SktUdXRSWkNvYUVONkNsNVBqZw==","took":1,"timed_out":false,"terminated_early":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3,"relation":"eq"},"max_score":null,"hits":[{"_index":"library","_type":"book","_id":"FydoCHIBfHIo0bf8rilh","_score":null,"_source":{"author":"b","name":"Wakes 2"},"sort":[4]}]}}"
2020-05-12 18:23:34 DEBUG HttpMethodBase:1024 - Resorting to protocol version default close connection policy
2020-05-12 18:23:34 DEBUG HttpMethodBase:1028 - Should NOT close connection, using HTTP/1.1
2020-05-12 18:23:34 DEBUG HttpConnection:1178 - Releasing connection back to connection manager.
2020-05-12 18:23:34 DEBUG HeaderProcessor:174 - Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
2020-05-12 18:23:34 DEBUG CommonsHttpTransport:642 - Using regular user provider to wrap rest request
2020-05-12 18:23:34 DEBUG UserGroupInformation:1722 - PrivilegedAction as:admin (auth:SIMPLE) from:org.elasticsearch.hadoop.mr.security.HadoopUser.doAs(HadoopUser.java:66)
2020-05-12 18:23:34 DEBUG header:70 - >> "POST /_search/scroll?scroll=5m HTTP/1.1[\r][\n]"
2020-05-12 18:23:34 DEBUG HttpMethodBase:1352 - Adding Host request header
2020-05-12 18:23:34 DEBUG header:70 - >> "Content-Type: application/json[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Accept: application/json[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "User-Agent: Jakarta Commons-HttpClient/3.1[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Host: 10.160.15.251:9200[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Content-Length: 80[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "[\r][\n]"
2020-05-12 18:23:34 DEBUG content:84 - >> "{"scroll_id":"DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAPcWLTZ0SktUdXRSWkNvYUVONkNsNVBqZw=="}"
2020-05-12 18:23:34 DEBUG EntityEnclosingMethod:508 - Request body sent
2020-05-12 18:23:34 DEBUG header:70 - << "HTTP/1.1 200 OK[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "HTTP/1.1 200 OK[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "content-type: application/json; charset=UTF-8[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "content-length: 265[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "[\r][\n]"
2020-05-12 18:23:34 DEBUG content:84 - << "{"_scroll_id":"DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAPcWLTZ0SktUdXRSWkNvYUVONkNsNVBqZw==","took":1,"timed_out":false,"terminated_early":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3,"relation":"eq"},"max_score":null,"hits":[]}}"
2020-05-12 18:23:34 DEBUG HttpMethodBase:1024 - Resorting to protocol version default close connection policy
2020-05-12 18:23:34 DEBUG HttpMethodBase:1028 - Should NOT close connection, using HTTP/1.1
2020-05-12 18:23:34 DEBUG HttpConnection:1178 - Releasing connection back to connection manager.
2020-05-12 18:23:34 DEBUG HeaderProcessor:174 - Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
2020-05-12 18:23:34 DEBUG CommonsHttpTransport:642 - Using regular user provider to wrap rest request
2020-05-12 18:23:34 DEBUG UserGroupInformation:1722 - PrivilegedAction as:admin (auth:SIMPLE) from:org.elasticsearch.hadoop.mr.security.HadoopUser.doAs(HadoopUser.java:66)
2020-05-12 18:23:34 DEBUG header:70 - >> "DELETE /_search/scroll HTTP/1.1[\r][\n]"
2020-05-12 18:23:34 DEBUG HttpMethodBase:1352 - Adding Host request header
2020-05-12 18:23:34 DEBUG header:70 - >> "Content-Type: application/json[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Accept: application/json[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "User-Agent: Jakarta Commons-HttpClient/3.1[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Host: 10.160.15.251:9200[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "Content-Length: 82[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - >> "[\r][\n]"
2020-05-12 18:23:34 DEBUG content:84 - >> "{"scroll_id":["DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAPcWLTZ0SktUdXRSWkNvYUVONkNsNVBqZw=="]}"
2020-05-12 18:23:34 DEBUG EntityEnclosingMethod:508 - Request body sent
2020-05-12 18:23:34 DEBUG header:70 - << "HTTP/1.1 200 OK[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "HTTP/1.1 200 OK[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "content-type: application/json; charset=UTF-8[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "content-length: 32[\r][\n]"
2020-05-12 18:23:34 DEBUG header:70 - << "[\r][\n]"
2020-05-12 18:23:34 INFO  Executor:54 - Finished task 0.0 in stage 2.0 (TID 2). 1337 bytes result sent to driver
2020-05-12 18:23:34 DEBUG TaskSchedulerImpl:58 - parentName: , name: TaskSet_2.0, runningTasks: 0
2020-05-12 18:23:34 INFO  TaskSetManager:54 - Finished task 0.0 in stage 2.0 (TID 2) in 32 ms on localhost (executor driver) (1/1)

I can know that the api of '_search/scroll' returned data in the log
but the data returned from sparksql is nothing.

Version Info

OS:         :  centos7
JVM         :  jdk1.8
Hadoop/Spark:   spark 2.3.2
ES-Hadoop   :  elasticsearch-hadoop-7.6.2
ES          :  elasticsearch 7.6.2

How search es using spark-sql with 'function match' or can I execute 'match' using spark-sql with other ways?thk.

Feature description

There is no match function in spark sql, at least not that it can find when running that line, though I have a feeling that is just a matter of how EQL and Spark SQL are different in their grammar.

If you are seeing data being returned to Spark, but no results in the end, it's possible that Spark is filtering the records out. Unless told otherwise, Spark will validate all data that it receives against what Spark understands the predicate to be. If you are relying on the match query to pull in data that is analyzed and thus not 100% identical to the search term, Spark is probably throwing it out. You can try again with double.filtering set to false (defaults to true), which should tell Spark not to execute the pushed down filters against the data to validate it.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.