[Spark Structured streaming] Elasticsearch sink index name

Hello,

I'm currently working with Spark Structured streaming, reading data from Kafka, and writing the output to an Elasticsearch sink. Everything is working fine, except that I would like the index name to be generated based on the current date/time, as it can be done in Logstash for instance.
Is there any way to do it ? Below is my code for reference (pySpark), latest try :slight_smile: :

def generateIndexName():
        return("es_spark_" + datetime.now().strftime("%Y_%m_%d__%H_%M"))
    
query = kafka_stream.writeStream \
    .outputMode("append") \
    .queryName("writing_to_es") \
    .format("org.elasticsearch.spark.sql") \
    .option("checkpointLocation", "C:/TEMP/") \
    .option("es.resource", generateIndexName() + "/es_spark") \
    .option("es.nodes", "192.168.1.1:9200") \
    .start()
query.awaitTermination()

The index name is generated when the code is first executed, and then the index name is never modified.

Thanks in advance if you have any idea how to figure this out.

Regards.

Vincent.

In Logstash the index name is determined based on the @timestamp field for every event, which does not seem to be what you are doing here. I do not know how to do that in your case, so will leave that for someone else to comment on, but would strongly advice against creating an index per minute as that is likely to cause a lot of problems.

Thanks for your reply. You're right, using the timestamp of the event could be a better idea than using the time the event is processed by spark.

Actually the "one index per minute" was configured only for testing purpose to check the frequency of index creation.

As christian has mentioned above, ES-Hadoop will not create an index name using the current time, but rather will let you give a field name on your document to use in the index name creation. This field can contain a timestamp, which is usually what users want when they are using time based indices.

Thank you James.
My concern is about the index size since I have to store several hundreds of gigabytes per day. I don't want to restart my Spark Streaming job every hour or so to modify the index name.
Still, as you said that ES-Hadoop can not dynamicaly generate the index name, I assume there is no other workaround.

Might it be possible to create a rollover index and index into this? This would give you a fixed alias to index into and allow the changing of indices to occur in the background based on size and/or age.

1 Like

@VincentL There shouldn't be anything to stop you from adding a final step to your spark job that adds the current time to a field on your document, and then using that field in your index name pattern for ES-Hadoop. If you don't want that time field to end up in ES, you can tell ES-Hadoop to not include it in the final document by setting es.mapping.exclude to that field name.

1 Like

Thank you very much @james.baiera , I searched the documentation to figure out how to use a field this way, and eventually found the answer here : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-streaming-write-dyn .
I already have several fields in my documents providing timestamps, I just had to tweak one a little bit so it can be properly parsed.
@Christian_Dahlqvist : thanks for the rollover index, which might do the job as well. Still, I wanted to include this mecanic in spark, for no valid reason :slight_smile: .

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.