Getting the following error:
17/04/10 21:14:06 INFO spark.SparkContext: Job finished: take at DStream.scala:593, took 0.138457215 s
17/04/10 21:14:06 INFO scheduler.JobScheduler: Finished job streaming job 1491876846000 ms.0 from job set of time 1491876846000 ms
17/04/10 21:14:06 INFO scheduler.JobScheduler: Starting job streaming job 1491876846000 ms.1 from job set of time 1491876846000 ms
Exception in thread "pool-13-thread-39" java.lang.NoSuchFieldError: USE_ANNOTATIONS
at org.elasticsearch.hadoop.rest.RestClient.(RestClient.java:80)
at org.elasticsearch.hadoop.rest.RestRepository.(RestRepository.java:113)
at org.elasticsearch.hadoop.rest.InitializationUtils.checkIndexExistence(InitializationUtils.java:267)
at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:100)
at org.elasticsearch.spark.streaming.EsSparkStreaming$$anonfun$doSaveToEs$1.apply(EsSparkStreaming.scala:71)
at org.elasticsearch.spark.streaming.EsSparkStreaming$$anonfun$doSaveToEs$1.apply(EsSparkStreaming.scala:71)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
This is my code:
package sentimentanalyzer
import java.util.Properties
import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import sentimentanalyzer.Sentiment.Sentiment
//import org.elasticsearch.spark.streaming._
import org.elasticsearch.spark.streaming._
import scala.collection.mutable
//import edu.stanford.nlp.pipeline.CoreNLPProtos.Sentiment
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
//import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import scala.collection.convert.wrapAll._
/**
* Created by Dungeoun on 4/8/17.
*/
case class MemT(timestamp: String, Text: String, Location: String, Keyword: String)
object sentanal {
def main(args: Array[String]) {
val (zkQuorum, group, topics, numThreads) = ("localhost", "localhost", "test", "20")
val sparkConf = new SparkConf().setMaster("local[*]").setSparkHome("/usr/local/spark").setAppName("sentanal").set("es.index.auto.create","false").set("es.nodes", "localhost").set("es.port","9200").set("es.resource","spark/docs").set("es.http.timeout","5m").set("es.scroll.size","50")
val sc = new SparkContext(config = sparkConf)
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val text = lines.map(_.split("::")).map(x=>MemT(x(0),SentimentAnalyzer.mainSentiment(x(1)).toString,x(2),x(3)))
text.print()
val text2 = sc.makeRDD(Seq(text))
val microbatches = mutable.Queue(text2)
ssc.queueStream(microbatches).saveToEs("spark/docs")
ssc.start
ssc.awaitTermination
}
}
Object SentimentAnalyzer not added for brevity