I'm trying to read data with spark in elasticsearch on indexe that could not exist, since my index has a date pattern. Since it's expected in my situation I only want an empty dataset. Is there a way to do it?
I'm using Spark 2.2.1 and elasticsearch 5.6.7 and I did try setting "es.index.read.missing.as.empty" to yes and provide my StructType without any luck.
Here is a sample of my code:
val elasticSearchSchema = new StructType()
.add("name", StringType)
.add("client", StringType)
.add("timestamp", TimestampType)
val keepOnlySessionWithinPeriod =
col("timestamp")
.geq(lit(Timestamp.from(queryStartDate)))
.and(col("timestamp")
.lt(lit(Timestamp.from(startPeriod))))
val loadFromElastic = sparkSession
.read
.option("es.index.read.missing.as.empty",true)
.schema(ReadSchema.elasticSearchSchema)
.format("org.elasticsearch.spark.sql")
.load(s"my-index-2018-02-01/mytype")
.filter(keepOnlySessionWithinPeriod)
With spark SQL, the behaviour is an exception and not an empty result and in my specific case, i don't query multiple indices. Since my indices are time base, I know how my index are named, I just don't know if it exist.
Right now I'm getting an exception and the spark process terminate. What I'm expecting is to get the empty result so that I can continue my process and re-insert data in elastic like I can do with basic http request with the flags ignore_unavailable and allow_no_indices.
I've been able to dig up to the fact that even if we provide a schema the Datasource classe try to load the mapping. See line DefaultSource.scala.
I did try to fix it, but i had 1 other test failing and getting 1 integration running was nearly impossible. I had to wait 30 minutes+ to test my code and for now I've been able to use my global alias to search my data.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.