Passing variables to ES query


I am searching for a solution to this problem for quite some time and I cannot make it work.
I am using es.query to perform an Elasticsearch range query on a date field.

GET /index/type/_search?pretty=true
    "query" : {
        "range" : {
            "indexed" : { 
                "gte" : "2016-04-09T16:49:47.927+0000"

Written as Spark Code:

val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkLemma")
.set("es.nodes", "hadoop-m:9200" )
.set("es.resource", "index/type")
.set("es.query", """{ "query" : { "range" : { "indexed" : { "gte": "2016-04-09T16:49:47.927+0000" } } } }""")
val esRDD = sc.esRDD()

Works great too. The result is the same.

Now I get stuck when I try to use a variable instead of a fixed date. I am using Scala and wrote a function that returns the actual date in ISO8601 format.

def getDate() : String = {
	val tz = TimeZone.getTimeZone("UTC")
	val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
	df.format(new Date())
val localTime = getDate()

val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkLemma")
	.set("es.nodes", "hadoop-m:9200" )
	.set("es.resource", "index/type")
	.set("es.query", """{ "query" : { "range" : { "indexed" : { "gte": "localTime" } } } }""")
val esIndex = "elastic/documents"
val sc = new SparkContext(conf)
val esRDD = sc.esRDD()

Now, Spark does query for "localTime" and I get an error that the String cannot be converted into the ISO8601 date format Elasticsearch uses. I searched the Elasticsearch documentation and other discussion boards but the closest things I found were Template Queries and Elasticsearch query does not take variable?

Is there a way to pass a variable to an Elasticsearch query?
I am using Spark 1.4.1 and elasticsearch-spark_2.10-2.2.0-beta1.jar

Thanks in advance!

1 Like

I figured it out on my own.
Supposing that getDate() returns for a string of the actual date. I used the string interpolation of Scala and passed the string "query" to ES by using brackets.

val localTime = getDate()
val query = s"""{ "query" : { "range" : { "indexed" : { "gte": "$localTime" } } } }"""

val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkLemma")
	.set("es.nodes", "hadoop-m:9200" )
	.set("es.resource", "elastic/documents")
	.set("es.write.operation", "upsert")
	.set("", "id")
	.set("", "false")
	.set("es.query", {query})
val esIndex = "index/type"
val sc = new SparkContext(conf)
val esRDD = sc.esRDD()
1 Like