Dynamic indexing in spark structured streamin

I'm using spark structured streaming to read so many files recursively and writing them into elasticsearch, I want to know is there a way to create index at runtime according to the file name spark reading from folder. suppose spark read file from client1 dir so it should index it in elasticsearch as client1.

I've read this https://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html#cfg-multi-writes, they did quite easily but the problem is I'm using spark structured streaming, in which there is no SavetoEs method instead there is WriteStream method, What i want to do is that i want to read files recursively from different directories and when i'll write them into elasticsearch i want to index them in elasticsearch as i-e file:/home/usr/client/message/single so it should be indexed like "client-message-single" in elasticsearh based on file path.

file_path= file://home/usr/client/email/message/singlemesage

file_split_path= file_path.split('/')

index = str(file_split_path[-4] + '-' + file_split_path[-3] + '-' + file_split_path[-2])

myjson['doc-index'] =index //adding key value in my json

df=myjson.writeStream.option("es.resource.write","{doc-index}/default").outputMode(""append").format("org.elasticsearch.spark.sql").start() df.awaitTermination()

while using above technique I'm getting following error

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot find match for {doc-index}/default at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136) at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74) at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41) at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52) at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)

there is "doc-index" : "client-message-singlemessage" in json which read by spark stuctured streaming.

This may be something strange with the Python client (which I'm assuming you are using instead of the Scala/Java api. I was able to get the constant value added on a stream by doing the following in Scala:

df.withColumn("doc-index", org.apache.spark.sql.functions.lit("client-message-singlemessage"))
      .option("es.resource.write", "{doc-index}")
      .option("checkpointLocation", "/tmp/checkpoint/test")

Perhaps you need to do something similar in Python?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.