我正在用ApacheSpark编写一个连续应用程序。在结构化流媒体的情况下,我试图从增量表中读取数据,通过时间窗口在事件时间执行流媒体聚合,并以追加模式将结果写入增量表。我对文档的期望是,在append模式下,只有一个时间窗口的最终聚合才会写入接收器。这不是我的经历。相反,我在我的目标增量表中看到了如下记录,与我尝试过的许多流配置无关(windowDuration=5分钟,slideDuration=20秒)。
流的输出示例
如上图所示,同一时间窗口向接收器贡献了许多记录。我确认每个微批次最多只能输出一条时间窗口记录,但一个时间窗口可以提供多个(数量上不明显一致)微批次的输出记录。以下是流式聚合代码的核心。
output_schema = create_trades_data_features_schema()
features_sdf = (trades_sdf.withWatermark("event_datetime", f"{trades_stream_watermark_secs} seconds")
.withColumn('time_window', f.window(timeColumn=f.col('event_datetime'),
windowDuration=f"{analysis_window_length_secs} seconds",
slideDuration=f"{analysis_window_hop_size_secs} seconds"))
.groupBy('time_window')
.applyInPandas(lambda pdf: generate_trades_data_features(pdf, output_schema, data_type_cast), output_schema))
Pandas UDF创建一些保存标量值的变量,构造形状为[1,N]的Pandas数据帧,并将其作为结果输出。也就是说,它返回一行。我唯一关注的是时间窗。我怎么能在同一个时间窗口内获得多条记录?我以多种方式创建并关闭了流,每次都收到了相同的结果(例如,根据Delta Lake文档、结构化流媒体指南,以及read/load/table/toTable API选项,尝试我能找到的每个选项配置……是的,很多小时的暴力)。我还尝试了水印持续时间和触发周期的不同范围的值;都没有产生影响。
这是追加模式下的预期行为吗(即,同一时间窗口中有多条记录)?
编辑:我使用的是Databricks运行时版本8.3 ML。它有Spark版本“3.1.1”。
编辑2:我暂时考虑这个问题是否相关:https://issues.apache.org/jira/browse/SPARK-25756
为了避免这一点,我将在下面草草记下我的初步结论,并在了解更多信息后进行更新。这可能是错误的。请不要因此而影响其他答案/评论。
总的来说,这不是故意的行为。每个微批次被单独发送到Pandas UDF(即,在每个触发器上,当前微批次和仅该微批次被发送到UDF),并导致结果表中的一条记录被发送到接收器,尽管处于附加模式。开发人员已经注意到这个问题,并且至少创建了一个JIRA问题来解决它。此工作线程似乎处于非活动状态。
其他数据点和建议:
非常有兴趣听到潜在的变通方法,或者如果我得出上面不正确的结论。干杯。
这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合
我在Spark结构化流媒体中使用Kafka源来接收融合编码的Avro记录。我打算使用Confluent Schema Registry,但与spark结构化流媒体的集成似乎是不可能的。 我已经看到了这个问题,但无法让它与融合模式注册表一起工作。使用Spark 2.0.2(结构化流媒体)阅读Kafka的Avro信息
我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。
我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https: