I'm currently working with Spark Structured streaming, reading data from Kafka, and writing the output to an Elasticsearch sink. Everything is working fine, except that I would like the index name to be generated based on the current date/time, as it can be done in Logstash for instance.
Is there any way to do it ? Below is my code for reference (pySpark), latest try :
def generateIndexName(): return("es_spark_" + datetime.now().strftime("%Y_%m_%d__%H_%M")) query = kafka_stream.writeStream \ .outputMode("append") \ .queryName("writing_to_es") \ .format("org.elasticsearch.spark.sql") \ .option("checkpointLocation", "C:/TEMP/") \ .option("es.resource", generateIndexName() + "/es_spark") \ .option("es.nodes", "192.168.1.1:9200") \ .start() query.awaitTermination()
The index name is generated when the code is first executed, and then the index name is never modified.
Thanks in advance if you have any idea how to figure this out.