当前位置: 首页 > 知识库问答 >
问题:

Spark结构流媒体-联合两个或多个流媒体源

韩佐
2023-03-14
val finalDF = flatDF1
      .union(flatDF2)
      .union(flatDF3)

val query = finalDF.writeStream
      .format("parquet")
      .outputMode("append")
      .option("path", hdfsLocation)
      .option("checkpointLocation", checkpointLocation)
      .option("failOnDataLoss", false)
      .start()

    query.awaitTermination()
Caused by: java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:42)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:185)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {

assert(sources.size == offsets.size)

这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合

共有1个答案

彭涵衍
2023-03-14

首先,请定义您的case类OffsetSeq如何与具有数据规则联合的代码相关联。

接下来,在执行这个联合并用WriteStream写到Kafka时,检查点是一个真正的问题。分离成多个WriteStream--每个writestreams都有自己的检查点--会因为联合操作而混淆批ID。使用相同的writestream与union of dataframes失败,因为检查点似乎查找union之前生成dataframes的所有模型,并且无法区分哪些行/记录来自哪些Dataframe/Model。

为了写到Kafka,从结构化sql流统一了Dataframes-最好使用writestream与foreach和ForEachWriter,包括进程方法中的Kafka生产者。不需要检查点;应用程序只使用临时检查点文件,这些文件被设置为在适当的时候删除--将“ForceDeleteTempCheckPointLocation”设置为true--在会话生成器中。

val output = dataFrameModelArray.reduce(_ union _)
val stream: StreamingQuery = output
  .writeStream.foreach(new ForeachWriter[Row] {

    def open(partitionId: Long, version: Long): Boolean = {
      true
    }

    def process(row: Row): Unit = {
      val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
      val record = new ProducerRecord[String, String](producerTopic, row.getString(0), row.getString(1))
      producer.send(record)
    }

    def close(errorOrNull: Throwable): Unit = {
    }
  }
).start()

如果需要,可以在过程方法中添加更多的逻辑。

注在联合之前,所有要联合的数据集都已转换为键值字符串列。值是要通过Kafka生成器发送的消息数据的json字符串。这对于在尝试联合之前获得write也是非常重要的。

svcModel.transform(query)
    .select($"key", $"uuid", $"currentTime", $"label", $"rawPrediction", $"prediction")
    .selectExpr("key", "to_json(struct(*)) AS value")
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

其中svcModel是DataFrameModelArray中的dataframe。

 类似资料:
  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 我正在读这篇博文: http://blog.jaceklaskowski.pl/2015/07/20/real-time-data-processing-using-apache-kafka-and-spark-streaming.html 它讨论了如何使用Spark Streaming和Apache Kafka进行一些近实时处理。我完全理解这篇文章。它确实展示了我如何使用Spark Stream

  • Streaming API用于通过令牌读取JSON令牌。 它将JSON内容读写为离散事件。 JsonReader和JsonWriter将数据读/写为令牌,称为JsonToken 。 它是处理JSON的三种方法中最强大的方法。 它具有最低的开销,并且在读/写操作中非常快。 它类似于XML的Stax解析器。 在本章中,我们将展示使用GSON流API来读取JSON数据。 Streaming API与to