当前位置: 首页 > 知识库问答 >
问题:

如何在传统Spark流媒体中使用foreachRDD

姚海
2023-03-14

我在使用foreachRDD进行CSV数据处理时遇到异常。这是我的代码

  case class Person(name: String, age: Long)
  val conf = new SparkConf()
  conf.setMaster("local[*]")
  conf.setAppName("CassandraExample").set("spark.driver.allowMultipleContexts", "true")
  val ssc = new StreamingContext(conf, Seconds(10))
  val smDstream=ssc.textFileStream("file:///home/sa/testFiles")

  smDstream.foreachRDD((rdd,time) => {
  val peopleDF = rdd.map(_.split(",")).map(attributes => 
  Person(attributes(0), attributes(1).trim.toInt)).toDF()
  peopleDF.createOrReplaceTempView("people")
  val teenagersDF = spark.sql("insert into table devDB.stam SELECT name, age 
  FROM people WHERE age BETWEEN 13 AND 29")
  //teenagersDF.show  
    })
  ssc.checkpoint("hdfs://go/hive/warehouse/devDB.db")
  ssc.start()

我得到以下错误。伊奥。NotSerializableException:已启用数据流检查点,但具有其功能的数据流不可序列化。阿帕奇。火花流动。StreamingContext序列化堆栈:-对象不可序列化(类:org.apache.spark.streaming.StreamingContext,值:org.apache.spark.streaming)。StreamingContext@1263422a)-字段(类:$iw,名称:ssc,类型:class org.apache.spark.streaming.StreamingContext)

请帮忙

共有1个答案

鲜于勇
2023-03-14

这个问题已经没有意义了,因为数据流正在被弃用/废弃。

代码中有几点需要考虑,因此很难收集到确切的问题。也就是说,我不得不深思熟虑,因为我不是序列化专家。

你可以找到一些帖子,其中一些人试图直接写入配置单元表,而不是路径。在我的回答中,我使用了一种方法,但你可以使用Spark SQL的方法为TempView编写,这都是可能的。

我模拟了来自队列流的输入,因此不需要应用拆分。如果你遵循同样的“全球”方法,你可以根据自己的情况进行调整。我选择写一个拼花文件,如果需要的话可以创建。您可以创建tempView,然后使用spark。按照您最初的方法使用sql。

DStreams上的输出操作是:

  • print()

foreachRDD

对从流生成的每个RDD应用函数func的最通用的输出运算符。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件中,或通过网络将其写入数据库。请注意,函数func是在运行流媒体应用程序的驱动程序进程中执行的,通常会在其中执行RDD操作,从而强制计算流媒体RDD。

声明保存到文件,但它可以通过foreachRDD做你想做的事情,尽管我认为这个想法是为了外部系统。在我看来,保存到文件比直接编写表更快。您希望通过流媒体尽快卸载数据,因为流量通常很高。

两个步骤:

在流媒体类的单独类中-在Spark 2.4下运行:

case class Person(name: String, age: Int)

然后,你需要应用的流逻辑——你可能需要我在笔记本中的一些导入,否则,当我在DataBricks下运行时:

import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import org.apache.spark.sql.SaveMode

val spark = SparkSession
           .builder
           .master("local[4]")
           .config("spark.driver.cores", 2)
           .appName("forEachRDD")
           .getOrCreate()

val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1)) 

val rddQueue = new mutable.Queue[RDD[List[(String, Int)]]]()
val QS = ssc.queueStream(rddQueue) 

QS.foreachRDD(q => {
   if(!q.isEmpty) {   
      val q_flatMap = q.flatMap{x=>x}
      val q_withPerson = q_flatMap.map(field => Person(field._1, field._2))
      val df = q_withPerson.toDF()      

      df.write
        .format("parquet")
        .mode(SaveMode.Append)
        .saveAsTable("SO_Quest_BigD")
   }
 }
)

ssc.start()
for (c <- List(List(("Fred",53), ("John",22), ("Mary",76)), List(("Bob",54), ("Johnny",92), ("Margaret",15)), List(("Alfred",21), ("Patsy",34), ("Sylvester",7)) )) {
   rddQueue += ssc.sparkContext.parallelize(List(c))
} 
ssc.awaitTermination()    
 类似资料:
  • 我正在尝试从Spark官方网站运行Spark Streaming示例 这些是我在pom文件中使用的依赖项: 这是我的Java代码: 当我尝试从Eclipse运行它时,我遇到以下异常: 我从我的IDE(eclipse)运行它。我是否必须创建并将JAR部署到火花中以使其运行。如果有人知道这个异常,请分享您的经验。提前谢谢

  • 我知道Spark Streaming会生成成批的RDD,但我想积累一个大数据帧,随着每批数据的更新而更新(通过在末尾添加新的数据帧)。 有没有办法像这样访问所有历史流数据? 我看过mapWithState(),但没有看到它专门积累数据帧。

  • 我想从React客户端向服务器发送文件。我知道为此,我应该使用grpc流媒体并将文件分割成块。所以,问题是如何将这些块发送到服务器?

  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 我正在读这篇博文: http://blog.jaceklaskowski.pl/2015/07/20/real-time-data-processing-using-apache-kafka-and-spark-streaming.html 它讨论了如何使用Spark Streaming和Apache Kafka进行一些近实时处理。我完全理解这篇文章。它确实展示了我如何使用Spark Stream

  • 我尝试在spark中使用结构化流媒体,因为它非常适合我的用例。然而,我似乎找不到将Kafka传入的数据映射到case类的方法。根据官方文件,我可以做到这一点。 mobEventDF有这样一个模式 有没有更好的方法?如何将其直接映射到下面的Scala Case类中?