Running into an issue where our spark logs are saying B cannot be cast to scala.Tuple2
import java.sql.Timestamp
import java.text.{ParsePosition, SimpleDateFormat}
import com.gm.SparkDataIngest.ManifestClasses.{ManifestContainer, ProductManifestDocument}
import com.gm.SparkDataIngest.Merge.{TopicGroup, mergeDoc}
import com.typesafe.config.{Config, ConfigFactory, ConfigObject}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.types._
import org.elasticsearch.spark.rdd.EsSpark
import org.json4s.JsonAST.{JInt, JString}
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.read
import org.json4s.{CustomSerializer, DefaultFormats, Formats, MappingException}
import sttp.client._
import sttp.client.json4s._
import sttp.model.StatusCode
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
object Main {
implicit val backend: SttpBackend[Try, Nothing, NothingT] = TryHttpURLConnectionBackend()
// Define JSON date parser
implicit val default: Formats = DefaultFormats + new TimestampSerializer
var conf: Config = _
/** Entry point of program, defines both spark jobs */
def main(args: Array[String]): Unit = {
conf = ConfigFactory.load()
// Initialize Spark session with config items
val sparkConf = new SparkConf(true)
.setAll(configToMap(conf.getObject("sparkConf")))
// Initialize Spark session with config items
val esConf = new SparkConf(true)
.setAll(configToMap(conf.getObject("esConf")))
val spark = SparkSession.builder()
.config(sparkConf)
.config(esConf)
.getOrCreate()
spark.sparkContext.setLogLevel(conf.getString("spark.logLevel"))
val outStreamES = spark.readStream
.format("kafka")
.option("subscribe", "Our_TOPIC_NAME")
.options(kafkaConfig)
.load()
outStreamES.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option("checkpointLocation", "ourcheckpoint location")
.start("ourindexname/ourdoctype").awaitTermination()
spark.streams.awaitAnyTermination()
}
def configToMap(c: ConfigObject): Map[String, String] = {
c.unwrapped().asScala.map(kv => (kv._1, kv._2.toString)).toMap
}
}
This is a sample message that we are trying to read from kafka.
{"metaData": {"ident": "ZZZZZZZZZ","collectionSource": "rpmii","collectionDateUtc": 1581025940000,"collectionVersion": 2}}