当前位置: 首页 > 知识库问答 >
问题:

结构化流媒体:水印与一次性语义

呼延弘方
2023-03-14

《编程指南》说,结构化流媒体保证使用适当的源/汇实现端到端的一次语义。

然而,我不明白当工作崩溃,我们应用了水印时,这是如何工作的。

下面是一个例子,我目前想象它是如何工作的,请纠正我对任何问题的误解。提前谢谢!

例子:

Spark Job:在每个1小时窗口中统计#个事件,带有1小时的水印。

信息:

  • A-时间戳上午10点
  • B-时间戳上午10:10
  • C-时间戳上午10:20
  • X-时间戳12pm
  • Y-时间戳12:50pm
  • Z-时间戳8pm

我们开始工作,从源代码中读取A、B、C,工作在上午10:30崩溃,然后我们才把它们写到我们的水槽中。

下午6点,作业返回并知道使用保存的检查点/WAL重新处理A、B、C。上午10点至11点窗口的最终计数为3。

接下来,它并行读取来自Kafka、X、Y、Z的新消息,因为它们属于不同的分区。Z首先被处理,所以最大事件时间戳被设置为8pm。当作业读取X和Y时,它们现在位于水印后面(8pm-1小时=7pm),因此它们作为旧数据被丢弃。晚上8-9点的最终计数为1,在下午12-1点的窗口内,作业不会报告任何内容。我们丢失了X和Y的数据。

---结束例子---

这种情况准确吗?如果是这样,1小时水印可能足以在KafkaSspark正常流动时处理延迟/无序数据,但在spark作业停止/Kafka连接长时间丢失时则不足以处理。避免数据丢失的唯一选择是使用水印的时间比您预期的工作时间长吗?

共有2个答案

端木飞
2023-03-14

Z首先被处理,因此最大事件时间戳被设置为8pm。

没错。即使可以首先计算Z,水印也会从当前查询迭代中的最大时间戳中减去。这意味着08:00 PM将被设置为我们从中减去水印时间的时间,这意味着12:00和12:50将被丢弃。

从留档:

对于在时间T开始的特定窗口,引擎将保持状态并允许延迟数据更新状态,直到(引擎看到的最大事件时间-延迟阈值

避免数据丢失的唯一选择是使用水印的时间比预期的工作时间长吗

不一定。假设您将每次Kafka查询要读取的最大数据量设置为100项。如果读取的是小批量,并且从每个分区连续读取,则每个批的每个最大时间戳可能不是代理中最新消息的最长时间,这意味着您不会丢失这些消息。

韦繁
2023-03-14

水印是小批量期间的固定值。在您的示例中,由于X、Y和Z在同一个小批量中处理,因此用于此记录的水印将是上午9:20。完成后,小批量水印将更新到晚上7点。

下面是SPARK-18124功能部件设计文件中的引用,该功能部件实现了水印功能:

要在基于触发器的执行中计算下降边界,我们必须执行以下操作。

  • 在每个触发器中,在聚合数据的同时,我们还扫描触发器数据中事件时间的最大值
  • 触发器完成后,计算水印=最大值(触发器前的事件时间,触发器中的最大事件时间)-阈值

也许模拟更适合描述:

import org.apache.hadoop.fs.Path
import java.sql.Timestamp
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime

val dir = new Path("/tmp/test-structured-streaming")
val fs = dir.getFileSystem(sc.hadoopConfiguration)
fs.mkdirs(dir)

val schema = StructType(StructField("vilue", StringType) ::
                        StructField("timestamp", TimestampType) ::
                        Nil)

val eventStream = spark
  .readStream
  .option("sep", ";")
  .option("header", "false")
  .schema(schema)
  .csv(dir.toString)

// Watermarked aggregation
val eventsCount = eventStream
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .count

def writeFile(path: Path, data: String) {
  val file = fs.create(path)
  file.writeUTF(data)
  file.close()
}

// Debug query
val query = eventsCount.writeStream
  .format("console")
  .outputMode("complete")
  .option("truncate", "false")
  .trigger(ProcessingTime("5 seconds"))
  .start()

writeFile(new Path(dir, "file1"), """
  |A;2017-08-09 10:00:00
  |B;2017-08-09 10:10:00
  |C;2017-08-09 10:20:00""".stripMargin)

query.processAllAvailable()
val lp1 = query.lastProgress

// -------------------------------------------
// Batch: 0
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// +---------------------------------------------+-----+

// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T10:10:00.000Z",
//     "max" : "2017-08-09T10:20:00.000Z",
//     "min" : "2017-08-09T10:00:00.000Z",
//     "watermark" : "1970-01-01T00:00:00.000Z"
//   },
//   ...
// }


writeFile(new Path(dir, "file2"), """
  |Z;2017-08-09 20:00:00
  |X;2017-08-09 12:00:00
  |Y;2017-08-09 12:50:00""".stripMargin)

query.processAllAvailable()
val lp2 = query.lastProgress

// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+
  
// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T14:56:40.000Z",
//     "max" : "2017-08-09T20:00:00.000Z",
//     "min" : "2017-08-09T12:00:00.000Z",
//     "watermark" : "2017-08-09T09:20:00.000Z"
//   },
//   "stateOperators" : [ {
//     "numRowsTotal" : 3,
//     "numRowsUpdated" : 2
//   } ],
//   ...
// }

writeFile(new Path(dir, "file3"), "")

query.processAllAvailable()
val lp3 = query.lastProgress

// -------------------------------------------
// Batch: 2
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+
  
// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 0,
//   "eventTime" : {
//     "watermark" : "2017-08-09T19:00:00.000Z"
//   },
//   "stateOperators" : [ ],
//   ...
// }

query.stop()
fs.delete(dir, true)

注意Batch 0如何以水印1970-01-01 00:00:00开始,而Batch 1以水印2017-08-09 09:20:00开始(Batch 0的最大事件时间减1小时)。批次2,空时,使用水印2017-08-09 19:00:00

 类似资料:
  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我试图从kafka主题获取数据并将其推送到hdfs位置。我面临以下问题。 在每条消息(kafka)之后,hdfs位置都会更新为带有.c000.csv格式的部分文件。我已经在HDFS位置的顶部创建了一个hive表,但是HIVE无法读取从火花结构化流写入的任何数据。 以下是spark结构化流媒体之后的文件格式 以下是我要插入的代码: 谁能帮帮我,为什么要创建这样的文件? 如果我执行dfs-cat/pa

  • 我有一个用于结构化流媒体的Kafka和Spark应用程序。特别是我的KafkaProducer具有以下配置: 然后我创建了一个ProducerRecord,如下所示: 其中,json。toString()表示一个JSON格式的字符串,这是我想在Spark中处理的值。现在,我主要做的是将Spark与Kafka主题联系起来,正如官方Spark结构化流媒体指南中所报道的那样: 然后 我有以下输出和异常:

  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合