Unable to create index and view message sent from Spark


(Sauraj) #1

Hi,
I am trying to get messages from kafka through spark and populate the messages in ElasticSearch. I am new in this. I want the messages of "df" command to be displayed as "filesystem" "message" . i am generating 2 different messages for 2 different use%. after some help i was able to generate message for different use% but in elasticsearch i cannot message or index. Pasting my code below:

package rnd

import org.apache.spark.sql.SQLContext

import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._

object WordFind {
def main(args: Array[String]) {
}

import org.apache.spark.SparkConf

val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
val sc = new SparkContext(conf)
//val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

val batchIntervalSeconds = 2
//val ssc = new StreamingContext(conf, Seconds(10))
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.dstream.ReceiverInputDStream

val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
"spark-streaming-consumer-group", Map("wordcounttopic" -> 5))

import org.apache.spark.streaming.dstream.DStream

val filteredStream: DStream[Array[String]] = kafkaStream
.filter(!_.2.contains("Filesystem")) // eliminate header
.map(._2.split("\s+")) // split with space
val outputDStream: DStream[String] = filteredStream.map {
row =>
val useIdx = row.length - 2
// if Use%>70 for any case> Message: Increase ROM size by 20%
// if Use%<30% for any case> Message: Decrease ROM size by 25%
val usePercent = row(useIdx).replace("%", "").toInt
usePercent match {
case x if x > 70 => "Increase ROM size by 20%"
case x if x < 30 => "Decrease ROM size by 25%"
case _ => "Undefined"
usePercent.toString
}

}
import org.elasticsearch.spark.sql.
_
outputDStream.print()
outputDStream.foreachRDD{messageRDD =>
messageRDD.saveToEs("dfvalueoperations_v1/kwc")

}

//outputDStream.saveToEs("kafkawordcount_v1/kwc")

// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))
//ssc.checkpoint(checkpointDir)
ssc
// }
// This starts the streaming context in the background.
ssc.start()
// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}

Expected output: Filesystem Message
dev Increase ROM size by 20%

Please help me with this . its urgent.


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.