Running into issue with sourcing from Kafka using Spark Scala to write to Elastic Search

Running into an issue where our spark logs are saying B cannot be cast to scala.Tuple2

import java.sql.Timestamp
import java.text.{ParsePosition, SimpleDateFormat}

import com.gm.SparkDataIngest.ManifestClasses.{ManifestContainer, ProductManifestDocument}
import com.gm.SparkDataIngest.Merge.{TopicGroup, mergeDoc}
import com.typesafe.config.{Config, ConfigFactory, ConfigObject}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.types._
import org.elasticsearch.spark.rdd.EsSpark
import org.json4s.JsonAST.{JInt, JString}
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.read
import org.json4s.{CustomSerializer, DefaultFormats, Formats, MappingException}
import sttp.client._
import sttp.client.json4s._
import sttp.model.StatusCode

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}


object Main {
  implicit val backend: SttpBackend[Try, Nothing, NothingT] = TryHttpURLConnectionBackend()


 // Define JSON date parser
  implicit val default: Formats = DefaultFormats + new TimestampSerializer
  var conf: Config = _

  /** Entry point of program, defines both spark jobs */
  def main(args: Array[String]): Unit = {
    conf = ConfigFactory.load()

    // Initialize Spark session with config items
    val sparkConf = new SparkConf(true)
      .setAll(configToMap(conf.getObject("sparkConf")))

    // Initialize Spark session with config items
    val esConf = new SparkConf(true)
      .setAll(configToMap(conf.getObject("esConf")))

    val spark = SparkSession.builder()
      .config(sparkConf)
      .config(esConf)
      .getOrCreate()

    spark.sparkContext.setLogLevel(conf.getString("spark.logLevel"))
val outStreamES = spark.readStream
      .format("kafka")
      .option("subscribe", "Our_TOPIC_NAME")
      .options(kafkaConfig)
      .load()

 outStreamES.writeStream
      .outputMode("append")
      .format("org.elasticsearch.spark.sql")
      .option("checkpointLocation", "ourcheckpoint location")
      .start("ourindexname/ourdoctype").awaitTermination()

 spark.streams.awaitAnyTermination()
  }

  def configToMap(c: ConfigObject): Map[String, String] = {
    c.unwrapped().asScala.map(kv => (kv._1, kv._2.toString)).toMap
  }
} 

This is a sample message that we are trying to read from kafka.
{"metaData": {"ident": "ZZZZZZZZZ","collectionSource": "rpmii","collectionDateUtc": 1581025940000,"collectionVersion": 2}}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.