当前位置: 首页 > 知识库问答 >
问题:

无法序列化foreachRDD中的SparkContext

秋和雅
2023-03-14

我正试图将流数据从Kafka保存到卡桑德拉。我能够读取和解析数据,但是当我调用下面的行来保存数据时,我会得到一个任务不序列化异常。我的类正在扩展序列化,但不确定为什么我会看到这个错误,在谷歌搜索了3个小时后没有得到多少帮助,有没有人能给出任何指示?

val collection = sc.parallelize(Seq((obj.id, obj.data)))
collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))` 


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._

import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime

object StreamProcessor extends Serializable {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
      .set("spark.cassandra.connection.host", "127.0.0.1")

    val sc = new SparkContext(sparkConf)

    val ssc = new StreamingContext(sc, Seconds(2))

    val sqlContext = new SQLContext(sc)

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")

    val topics = args.toSet

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)

    stream.foreachRDD { rdd =>

      if (!rdd.isEmpty()) {
        try {

          rdd.foreachPartition { iter =>
            iter.foreach {
              case (key, msg) =>

                val obj = msgParseMaster(msg)

                val collection = sc.parallelize(Seq((obj.id, obj.data)))
                collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))

            }
          }

        }

      }
    }

    ssc.start()
    ssc.awaitTermination()

  }

  import org.json4s._
  import org.json4s.native.JsonMethods._
  case class wordCount(id: Long, data: String) extends serializable
  implicit val formats = DefaultFormats
  def msgParseMaster(msg: String): wordCount = {
    val m = parse(msg).extract[wordCount]
    return m

  }

}

我得到了

sparkException:任务不可序列化

下面是完整的日志

16/08/06 10:24:52错误JobScheduler:运行作业流作业1470504292000 MS.0 org.apache.spark.util.closurecleaner$.ensureclealizable(closurecleaner.scala:304)在org.apache.spark.util.closurecleaner$.org.apache.spark.util.closurecleaner$$clean(closurecleaner.scala:294)在org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:122)上任务不可序列化sparkcontext.clean(sparkcontext.scala:2055)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1。apply(rdd.scala:919)在org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1。apply(rdd.scala:918)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:111)在org.apache.spark.rdd.rdd.withscope(在org.apache.spark.rdd.rdd.foreachpartition(rdd.scala:918)上

共有1个答案

谷弘致
2023-03-14

sparkcontext不可序列化,您不能在foreachrdd中使用它,而且从使用您的图来看,您不需要它。相反,您可以简单地映射每个RDD,解析出相关数据并将新RDD保存到Cassandra:

stream
  .map { 
    case (_, msg) => 
      val result = msgParseMaster(msg)
      (result.id, result.data)
   }
  .foreachRDD(rdd => if (!rdd.isEmpty)
                       rdd.saveToCassandra("testKS",
                                           "testTable",
                                            SomeColumns("id", "data")))
 类似资料:
  • 问题内容: 我试图序列化和反序列化内部对象的数组列表: HairList对象也是一个可序列化的对象。 此代码执行返回以下错误: 排队 我不知道我在做什么错。你能给个小费吗? 更新: 解决: 仅使用HairBirt的本机数组而不是ArrayList即可工作: 代替 感谢大家的帮助。 问题答案: 不要使用-而是使用二进制数据并对它进行base64编码,以将其转换为字符串而不会丢失信息。 我强烈怀疑这是

  • 问题内容: 我正在使用Java 8 java.time.LocalDate来解析日期。 但是尝试将LocalDate对象插入到mongodb中。我在Java驱动程序中收到错误: 错误日志: java.lang.RuntimeException:json无法序列化类型:com.mongodb.util.JSONSerializers $ MapSerializer.serialize(JSONSer

  • java.lang.runtimeException:json无法序列化类型:在com.mongodb.util.ClassMapBasedObjectSerializer.serializer(ClassMapBasedObjectSerializer.java:77)在com.mongodb.util.ClassMapBasedObjectSerializer.serializer(jsonS

  • 问题内容: 作为一个小项目,我一直在尝试做一个小事,它可以读取序列化的lambda(从本地或从FTP)并调用它们的运行函数作为测试的一部分,以测试Windows中的文件关联(即打开某些文件类型)使用特定程序打开它们),但不管如何,无论如何,它似乎从未正确地反序列化。 lambda被这样声明 并使用由ObjectOutputStream包装的[n可选] BufferedOutputStream包装的

  • 问题内容: 在hibernate状态下执行条件查询时,出现以下异常: 可能是什么问题呢? PS:虽然可能不相关,但我的hibernate版本是hibernate-4.0.1 final。 问题答案: 问题在于被引用的实体对实体有另一个引用,并且该关系未由任何-like注释进行注释。

  • 问题内容: 我正在使用Hibernate的两个表,但我不明白为什么对于特定查询我有此问题。我希望有人意识到这个问题。 我有一个桌子用户 和一个桌子区域 日志说: 问题答案: 我建议仅在字段或getter上设置注释。我更喜欢田野,但那只是我的口味。 请参阅Hibernate中有关字段和属性访问的奇怪案例: 因此,要么将注释仅放置在字段上,要么仅放置在getters(properties)上。混合它们