Running into issue with sourcing from Kafka using Spark Scala to write to Elastic Search

Running into an issue where our spark logs are saying B cannot be cast to scala.Tuple2

import java.sql.Timestamp
import java.text.{ParsePosition, SimpleDateFormat}

import{ManifestContainer, ProductManifestDocument}
import{TopicGroup, mergeDoc}
import com.typesafe.config.{Config, ConfigFactory, ConfigObject}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.types._
import org.elasticsearch.spark.rdd.EsSpark
import org.json4s.JsonAST.{JInt, JString}
import org.json4s.jackson.Serialization
import org.json4s.{CustomSerializer, DefaultFormats, Formats, MappingException}
import sttp.client._
import sttp.client.json4s._
import sttp.model.StatusCode

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

object Main {
  implicit val backend: SttpBackend[Try, Nothing, NothingT] = TryHttpURLConnectionBackend()

 // Define JSON date parser
  implicit val default: Formats = DefaultFormats + new TimestampSerializer
  var conf: Config = _

  /** Entry point of program, defines both spark jobs */
  def main(args: Array[String]): Unit = {
    conf = ConfigFactory.load()

    // Initialize Spark session with config items
    val sparkConf = new SparkConf(true)

    // Initialize Spark session with config items
    val esConf = new SparkConf(true)

    val spark = SparkSession.builder()

val outStreamES = spark.readStream
      .option("subscribe", "Our_TOPIC_NAME")

      .option("checkpointLocation", "ourcheckpoint location")


  def configToMap(c: ConfigObject): Map[String, String] = {
    c.unwrapped() => (kv._1, kv._2.toString)).toMap

This is a sample message that we are trying to read from kafka.
{"metaData": {"ident": "ZZZZZZZZZ","collectionSource": "rpmii","collectionDateUtc": 1581025940000,"collectionVersion": 2}}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.