Spark Structured streaming

HI there! I'm reading files from a folder via spark structured stream and I don't want to hard code the index name actually i want to utilize this.
" Writing to dynamic/multi-resourcesedit

For cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the es.resource.write field which accepts a pattern that is resolved from the document content, at runtime. Following the aforementioned media example, one could configure it as follows:

Scalaedit
**val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") **

** val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") **
** val batch = sc.makeRDD(Seq(game, book, cd)) **
** val microbatches = mutable.Queue(batch) **
** ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc") **
** ssc.start()"**
and her is my code of line
index= str(file_path.split[-2])
I have json in result variable and I'm appending in it a key value
result['index']= index
df.writeStream.option("checkpointLocation", "/home/user/chkpt_stream6").outputMode("append").format("org.elasticsearch.spark.sql").option("checkpointLocation", "/home/user/spark_chkpt9").option("es.resource","{index}/default").start()

But I'm getting an error
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot find match for {index}/default

any help would be really apprecaited

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.