Hi James,
Thanks for your reply. Please find the details as below:
I am using Spark v2.4 and ElasticSearch v6.8 and my index auto creation is "true"
#######Code Snippet that is working #####################
val dsIn = spark.readStream
.format(KAFKA_FORMAT)
.option(KAFKA_HOSTS_PROP, Configuration.getStringList(KAFKA_HOSTS_CONF).toArray.mkString(","))
.option(CONSUMER_SUBSCRIBE_PROP, Configuration.getString(CONSUMER_SUBSCRIBE_CONF))
.option(CONSUMER_GROUPID_PROP, Configuration.getString(CONSUMER_GROUPID_CONF))
.option(CONSUMER_ENABLE_AUTOCOMMIT_PROP, Configuration.getBoolean(CONSUMER_ENABLE_AUTOCOMMIT_CONF))
.option(CONSUMER_STARTING_OFFSET_PROP, Configuration.getString(CONSUMER_STARTING_OFFSET_CONF))
.option(CONSUMER_FAILONLOSS_PROP, false)
.load()
.selectExpr("CAST(value AS STRING)")
.as[(String)]
.select(from_json($"value", inputSchema).as("value"))
dsIn.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "localhost")
.option("es.port", "9200")
.option("checkpointLocation", "/tmp")
// .option("es.mapping.id", "id")
//.option("es.resource","alta_alerts_test/testdata")
.start("alta_alerts_dummy_2/_doc")
// .awaitTermination()
######### Code Snippet that is not working ###############
val dsIn = spark.readStream
.format(KAFKA_FORMAT)
.option(KAFKA_HOSTS_PROP, Configuration.getStringList(KAFKA_HOSTS_CONF).toArray.mkString(","))
.option(CONSUMER_SUBSCRIBE_PROP, Configuration.getString(CONSUMER_SUBSCRIBE_CONF))
.option(CONSUMER_GROUPID_PROP, Configuration.getString(CONSUMER_GROUPID_CONF))
.option(CONSUMER_ENABLE_AUTOCOMMIT_PROP, Configuration.getBoolean(CONSUMER_ENABLE_AUTOCOMMIT_CONF))
.option(CONSUMER_STARTING_OFFSET_PROP, Configuration.getString(CONSUMER_STARTING_OFFSET_CONF))
.option(CONSUMER_FAILONLOSS_PROP, false)
.load()
.selectExpr("CAST(value AS STRING)")
.as[(String)]
.select(from_json($"value", inputSchema).as("value"))
Group By few columns with the specific time window
val requestCounts = dsIn
.withWatermark("receivedtime", "10 minutes")
.groupBy("a", "b", "c", "d", "e", "f", "g", "h", "i", "j" as ("application"), window($"receivedtime", "10 minutes", "5 minutes") as "window").count()
requestCounts.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "localhost")
.option("es.port", "9200")
.option("checkpointLocation", "/tmp")
// .option("es.mapping.id", "id")
//.option("es.resource","alta_alerts_test/testdata")
.start("alta_alerts_dummy_2/_doc")
// .awaitTermination()
Thanks in advance for your help.
Regards,
Sandeep Reddy