Data ingestion into ElasticSearch from Spark Structured Streaming

Hi,

I have created dataset/dataframe using the watermark and window function and writing the output to ElasticSearch is not working.

However dataset/dataframe created without watermark and window inserts data into ElasticSearch.

Please find the code snippet.

val df = dsLog1.withWatermark("time","3 minutes").groupBy(window(col("time"),"3 minutes","1 minute"))
.agg(count(col("column_name")))

df.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "localhost")
.option("es.port", "9200")
//.option("checkpointLocation", "/tmp")
.option("es.resource","test4/_doc")
.start()

Thanks in Advance for the help.

Welcome to the forums!

I have created dataset/dataframe using the watermark and window function and writing the output to ElasticSearch is not working.

Can you include what is not working about the job?

Hi James,

Thanks for your reply. Please find the details as below:

I am using Spark v2.4 and ElasticSearch v6.8 and my index auto creation is "true"

#######Code Snippet that is working #####################

val dsIn = spark.readStream
.format(KAFKA_FORMAT)
.option(KAFKA_HOSTS_PROP, Configuration.getStringList(KAFKA_HOSTS_CONF).toArray.mkString(","))
.option(CONSUMER_SUBSCRIBE_PROP, Configuration.getString(CONSUMER_SUBSCRIBE_CONF))
.option(CONSUMER_GROUPID_PROP, Configuration.getString(CONSUMER_GROUPID_CONF))
.option(CONSUMER_ENABLE_AUTOCOMMIT_PROP, Configuration.getBoolean(CONSUMER_ENABLE_AUTOCOMMIT_CONF))
.option(CONSUMER_STARTING_OFFSET_PROP, Configuration.getString(CONSUMER_STARTING_OFFSET_CONF))
.option(CONSUMER_FAILONLOSS_PROP, false)
.load()
.selectExpr("CAST(value AS STRING)")
.as[(String)]
.select(from_json($"value", inputSchema).as("value"))

dsIn.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "localhost")
.option("es.port", "9200")
.option("checkpointLocation", "/tmp")
// .option("es.mapping.id", "id")
//.option("es.resource","alta_alerts_test/testdata")
.start("alta_alerts_dummy_2/_doc")
// .awaitTermination()

######### Code Snippet that is not working ###############

val dsIn = spark.readStream
.format(KAFKA_FORMAT)
.option(KAFKA_HOSTS_PROP, Configuration.getStringList(KAFKA_HOSTS_CONF).toArray.mkString(","))
.option(CONSUMER_SUBSCRIBE_PROP, Configuration.getString(CONSUMER_SUBSCRIBE_CONF))
.option(CONSUMER_GROUPID_PROP, Configuration.getString(CONSUMER_GROUPID_CONF))
.option(CONSUMER_ENABLE_AUTOCOMMIT_PROP, Configuration.getBoolean(CONSUMER_ENABLE_AUTOCOMMIT_CONF))
.option(CONSUMER_STARTING_OFFSET_PROP, Configuration.getString(CONSUMER_STARTING_OFFSET_CONF))
.option(CONSUMER_FAILONLOSS_PROP, false)
.load()
.selectExpr("CAST(value AS STRING)")
.as[(String)]
.select(from_json($"value", inputSchema).as("value"))

Group By few columns with the specific time window

val requestCounts = dsIn
.withWatermark("receivedtime", "10 minutes")
.groupBy("a", "b", "c", "d", "e", "f", "g", "h", "i", "j" as ("application"), window($"receivedtime", "10 minutes", "5 minutes") as "window").count()

requestCounts.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "localhost")
.option("es.port", "9200")
.option("checkpointLocation", "/tmp")
// .option("es.mapping.id", "id")
//.option("es.resource","alta_alerts_test/testdata")
.start("alta_alerts_dummy_2/_doc")
// .awaitTermination()

Thanks in advance for your help.

Regards,
Sandeep Reddy