Hi,
I am trying to get messages from kafka through spark and populate the messages in ElasticSearch. I am new in this. I want the messages of "df" command to be displayed as "filesystem" "message" . i am generating 2 different messages for 2 different use%. after some help i was able to generate message for different use% but in elasticsearch i cannot message or index. Pasting my code below:
package rnd
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._
object WordFind {
def main(args: Array[String]) {
}
import org.apache.spark.SparkConf
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
val sc = new SparkContext(conf)
//val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val batchIntervalSeconds = 2
//val ssc = new StreamingContext(conf, Seconds(10))
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.dstream.ReceiverInputDStream
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
"spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
import org.apache.spark.streaming.dstream.DStream
val filteredStream: DStream[Array[String]] = kafkaStream
.filter(!_.2.contains("Filesystem")) // eliminate header
.map(._2.split("\s+")) // split with space
val outputDStream: DStream[String] = filteredStream.map {
row =>
val useIdx = row.length - 2
// if Use%>70 for any case> Message: Increase ROM size by 20%
// if Use%<30% for any case> Message: Decrease ROM size by 25%
val usePercent = row(useIdx).replace("%", "").toInt
usePercent match {
case x if x > 70 => "Increase ROM size by 20%"
case x if x < 30 => "Decrease ROM size by 25%"
case _ => "Undefined"
usePercent.toString
}
}
import org.elasticsearch.spark.sql.
_
outputDStream.print()
outputDStream.foreachRDD{messageRDD =>
messageRDD.saveToEs("dfvalueoperations_v1/kwc")
}
//outputDStream.saveToEs("kafkawordcount_v1/kwc")
// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))
//ssc.checkpoint(checkpointDir)
ssc
// }
// This starts the streaming context in the background.
ssc.start()
// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}
Expected output: Filesystem Message
dev Increase ROM size by 20%
Please help me with this . its urgent.