Spark Structured Streaming timeseries indices

I have a follow up question to [Spark Structured streaming] Elasticsearch sink index name

James said, "As christian has mentioned above, ES-Hadoop will not create an index name using the current time, but rather will let you give a field name on your document to use in the index name creation. This field can contain a timestamp, which is usually what users want when they are using time based indices."

What is the field name that ES-Hadoop (or ES possibly) looking for index name creation? And how do we configure which field to use?

Do we have a related documentation or example? In this example, https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-sql-streaming-write-scala , wouldn't it create a spark index?

I had to combine two examples (one and two) to have it working for Structured Streaming timeseries indices.

In scala it looks something like

val df = spark.readStream.schema(schema)
.parquet("/path-to/*.parquet") //assuming parquet file has timestamp field
.withColumn("@timestamp",date_format('timestamp,"yyyy")) //format the field as you want in the index name

val stream = df.writeStream
.option("checkpointLocation", "/save/location")      
.format("es")
.start("timeseries-{@timestamp}/doc")

As you've already found out above, any field on a document can be used to create the index name. A simple example of this for streaming can be found at Apache Spark support | Elasticsearch for Apache Hadoop [8.11] | Elastic

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.