i have streaming data coming from kafka , and using spark structured streaming i do some filteration .
val df=streamingData.selectExpr("*").groupBy("date","location","name").agg((count(("name"))
I want to send streaming data to elasticsearch. how can i dump this aggregated data to Elasticsearch. i have used output mode complete and update too.
without aggregation my streaming data is dumping to Elasticsearch with output mode append.it works fine .
but i want to send the aggregated data of streaming batch to Elasticsearch .