ESSpark : Callback function on success and fail

(Akshay Mhetre) #1


I am trying to save rdd in ES using :
EsSpark.saveJsonToEs(rdd, "logstash-{@timestamp:YYYY.MM.dd}/logs"))
This rdd I am getting from a kafka stream created using kafka direct api. As this api. enforces you to store offset in zookeeper by yourself, I want to save offset whenever the message is successfully sent to ES. However, as this rdd might contain set of messages and there could be chances of failure in between., I need a method such that I can call save offset on each successful message sent.


EsSpark.saveJsonToEs(rdd, "logstash-{@timestamp:YYYY.MM.dd}/logs")).onEachSuccessfulMsg(str => saveOffsetForMsg(str))

Is there any way with which I can hook to each message sent ?
or is there any api using which I can save each message in ES instead of rdd.


(system) #2