当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流媒体在追加模式下,每个时间窗口输出许多行

杨曜瑞
2023-03-14

我正在用ApacheSpark编写一个连续应用程序。在结构化流媒体的情况下,我试图从增量表中读取数据,通过时间窗口在事件时间执行流媒体聚合,并以追加模式将结果写入增量表。我对文档的期望是,在append模式下,只有一个时间窗口的最终聚合才会写入接收器。这不是我的经历。相反,我在我的目标增量表中看到了如下记录,与我尝试过的许多流配置无关(windowDuration=5分钟,slideDuration=20秒)。

流的输出示例

如上图所示,同一时间窗口向接收器贡献了许多记录。我确认每个微批次最多只能输出一条时间窗口记录,但一个时间窗口可以提供多个(数量上不明显一致)微批次的输出记录。以下是流式聚合代码的核心。

output_schema = create_trades_data_features_schema()
features_sdf = (trades_sdf.withWatermark("event_datetime", f"{trades_stream_watermark_secs} seconds")
                          .withColumn('time_window', f.window(timeColumn=f.col('event_datetime'),
                                                              windowDuration=f"{analysis_window_length_secs} seconds",
                                                              slideDuration=f"{analysis_window_hop_size_secs} seconds"))
                          .groupBy('time_window')
                          .applyInPandas(lambda pdf: generate_trades_data_features(pdf, output_schema, data_type_cast), output_schema))

Pandas UDF创建一些保存标量值的变量,构造形状为[1,N]的Pandas数据帧,并将其作为结果输出。也就是说,它返回一行。我唯一关注的是时间窗。我怎么能在同一个时间窗口内获得多条记录?我以多种方式创建并关闭了流,每次都收到了相同的结果(例如,根据Delta Lake文档、结构化流媒体指南,以及read/load/table/toTable API选项,尝试我能找到的每个选项配置……是的,很多小时的暴力)。我还尝试了水印持续时间和触发周期的不同范围的值;都没有产生影响。

这是追加模式下的预期行为吗(即,同一时间窗口中有多条记录)?

编辑:我使用的是Databricks运行时版本8.3 ML。它有Spark版本“3.1.1”。

编辑2:我暂时考虑这个问题是否相关:https://issues.apache.org/jira/browse/SPARK-25756

共有1个答案

凌和悦
2023-03-14

为了避免这一点,我将在下面草草记下我的初步结论,并在了解更多信息后进行更新。这可能是错误的。请不要因此而影响其他答案/评论。

总的来说,这不是故意的行为。每个微批次被单独发送到Pandas UDF(即,在每个触发器上,当前微批次和仅该微批次被发送到UDF),并导致结果表中的一条记录被发送到接收器,尽管处于附加模式。开发人员已经注意到这个问题,并且至少创建了一个JIRA问题来解决它。此工作线程似乎处于非活动状态。

其他数据点和建议:

  • 不同论坛(如Databricks)中的多个问题,以及上面链接的JIRA问题,直接引用或提供了Spark中该漏洞的明确示例
  • 该问题自2018年以来一直存在,版本3.1.2似乎有一个修复程序,但JIRA问题已批量解决,我看不到讨论/工作的继续
  • 目前,对于Python开发人员来说,Spark Structured Streaming只支持流聚合上的琐碎数据转换(即,可以在GroupedData对象上运行的函数,apply或ApplyPandas除外)
  • 如果您正在为一个非平凡的应用程序寻找一个流计算引擎,在这个问题得到解决之前,不要指望Python Spark API会提供支持

非常有兴趣听到潜在的变通方法,或者如果我得出上面不正确的结论。干杯。

 类似资料: