Getting NoSuchFieldError when using saveToEs in spark scala

Getting the following error:

17/04/10 21:14:06 INFO spark.SparkContext: Job finished: take at DStream.scala:593, took 0.138457215 s
17/04/10 21:14:06 INFO scheduler.JobScheduler: Finished job streaming job 1491876846000 ms.0 from job set of time 1491876846000 ms
17/04/10 21:14:06 INFO scheduler.JobScheduler: Starting job streaming job 1491876846000 ms.1 from job set of time 1491876846000 ms
Exception in thread "pool-13-thread-39" java.lang.NoSuchFieldError: USE_ANNOTATIONS
at org.elasticsearch.hadoop.rest.RestClient.(RestClient.java:80)
at org.elasticsearch.hadoop.rest.RestRepository.(RestRepository.java:113)
at org.elasticsearch.hadoop.rest.InitializationUtils.checkIndexExistence(InitializationUtils.java:267)
at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:100)
at org.elasticsearch.spark.streaming.EsSparkStreaming$$anonfun$doSaveToEs$1.apply(EsSparkStreaming.scala:71)
at org.elasticsearch.spark.streaming.EsSparkStreaming$$anonfun$doSaveToEs$1.apply(EsSparkStreaming.scala:71)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

This is my code:

package sentimentanalyzer

import java.util.Properties

import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import sentimentanalyzer.Sentiment.Sentiment
//import org.elasticsearch.spark.streaming._
import org.elasticsearch.spark.streaming._
import scala.collection.mutable
//import edu.stanford.nlp.pipeline.CoreNLPProtos.Sentiment
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
//import edu.stanford.nlp.sentiment.SentimentCoreAnnotations

import scala.collection.convert.wrapAll._

/**
  * Created by Dungeoun on 4/8/17.
  */


case class MemT(timestamp: String, Text: String, Location: String, Keyword: String)

object sentanal {

  def main(args: Array[String]) {

    val (zkQuorum, group, topics, numThreads) = ("localhost", "localhost", "test", "20")
    val sparkConf = new SparkConf().setMaster("local[*]").setSparkHome("/usr/local/spark").setAppName("sentanal").set("es.index.auto.create","false").set("es.nodes", "localhost").set("es.port","9200").set("es.resource","spark/docs").set("es.http.timeout","5m").set("es.scroll.size","50")
    val sc = new SparkContext(config = sparkConf)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val text = lines.map(_.split("::")).map(x=>MemT(x(0),SentimentAnalyzer.mainSentiment(x(1)).toString,x(2),x(3)))

    text.print()
    val text2 = sc.makeRDD(Seq(text))
    val microbatches = mutable.Queue(text2)

    ssc.queueStream(microbatches).saveToEs("spark/docs")

    ssc.start
    ssc.awaitTermination
  }


}

Object SentimentAnalyzer not added for brevity

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.