当前位置: 首页 > 知识库问答 >
问题:

在完全输出模式下,Spark结构化流是否可以丢弃/控制中间状态?(火花2.4.0)

萧星火
2023-03-14

我有一个场景,我想处理来自kafka主题的数据。我有这个特定的java代码来从kafka主题中读取数据作为流。

Dataset<Row> streamObjs = sparkSession.readStream().format("kafka")
                .option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", streamTopic)
                .option("failOnDataLoss", false).load();

我将其转换为 String,定义架构,然后尝试使用水印(用于后期数据)和窗口(用于分组和聚合),最后输出到 kafka sink。

Dataset<Row> selectExprImporter = streamObjs.selectExpr("CAST(value AS STRING)");

StructType streamSchema = new StructType().add("id", DataTypes.StringType)
                .add("timestamp", DataTypes.LongType)
                .add("values", new MapType(DataTypes.StringType, DataTypes.DoubleType, false));

Dataset<Row> selectValueImporter = selectExprImporter
                .select(functions.from_json(new Column("value"), streamSchema ).alias("data"));
.
.
(More transformations/operations)
.
.

Dataset<Row> aggCount_15min = streamData.withWatermark("timestamp", "2 minute")
                .withColumn("frequency", functions.lit(15))
                .groupBy(new Column("id"), new Column("frequency"),
                        functions.window(new Column("timestamp"), "15 minute").as("time_range"))
                .agg(functions.mean("value").as("mean_value"), functions.sum("value").as("sum"),
                        functions.count(functions.lit(1)).as("number_of_values"))
                .filter("mean_value > 35").orderBy("id", "frequency", "time_range");

aggCount_15min.selectExpr("to_json(struct(*)) AS value").writeStream()
                .outputMode(OutputMode.Complete()).format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
                .option("topic", outputTopic).option("checkpointLocation", checkpointLocation).start().awaitTermination();

问题

>

  • 我是否正确理解在 kafka 接收器中使用完整输出模式时,中间状态将永远增加,直到我出现内存不足异常

    此外,完整输出模式的理想用例是什么?仅当中间数据/状态没有增加时才使用它?

    在我的例子中需要完整输出模式,因为我想使用orderBy子句。有没有什么方法可以让我强迫spark在每30分钟后放弃它的状态,重新处理新的数据?

    有没有更好的方法不使用“完整输出”模式,但仍能获得期望的结果?我应该使用spark结构化流媒体之外的其他内容吗?

    所需的结果是根据上面的查询聚合和分组数据,然后在创建第一个批次后,删除所有状态并为下一批重新开始。在这里,批处理可以是上次处理时间戳的函数。就像说删除所有状态并在当前时间戳从第一个收到的时间戳超过 20 分钟时重新开始,或者更好的是窗口时间(在本例中为 15 分钟)的函数,例如当处理了 4 批 15 分钟的窗口时,第 5 批的时间戳到达前 4 批的下降状态并重新开始此批次。

  • 共有1个答案

    史超英
    2023-03-14

    这个问题问了很多事情,而较少关注Spark Structured Streaming(SSS)的实际作用。然后回答您的编号问题、标题问题和非编号问题:

    A.标题问题:

    并非如此,但Complete模式仅存储聚合,因此并非存储所有数据,而是允许基于增量添加数据进行重新计算的状态。我发现手册在描述方面有误导性,但它可能是我的错。但否则你会得到这个错误:

    org.apache.spark.sql.AnalysisException:当流式数据帧/数据集上没有流式聚合时,不支持完全输出模式

    1. 当在kafka接收器中使用完整输出模式时,中间状态将一直增加,直到我出现OutOfMemory异常,我的理解是否正确

    Kafka水槽不在这里。中间状态是Spark Structured Streaming需要存储的内容。它存储聚合并丢弃较新的数据。但最终你会因为这个或我怀疑的其他错误而得到OOM。

    对于接收到的所有数据的聚合。你问题的第二部分不符合逻辑,因此我无法回答。状态通常会随着时间的推移而增加。

    不,没有。即使试着优雅地停下来也不是一个主意,然后重新开始,因为时间还不到15分钟。而且,这是反对SSS方法的。从手册中可以看出:只有在聚合后和完全输出模式下,流式数据集才支持排序操作。您不能随意删除状态,再次聚合讨论。

    不,因为您有许多当前实现无法满足的要求。除非您放弃订单,并在附加模式下执行非重叠窗口操作(15,15),如果内存正常工作,请使用微小的水印。随后,您将依靠下游处理进行排序,因为不支持排序。

    最后一个整体问题:所需的结果是根据上面的查询聚合和分组数据,然后当第一批创建完成时,删除所有状态并为下一批重新开始。这里的批次可以是上次处理时间戳的函数。比如当当前时间戳从第一个接收到的时间戳超过20分钟时,删除所有状态并重新开始,或者更好,如果它是窗口时间的函数(在本例中为15分钟),比如当4批15分钟的窗口处理完成时,第五批的时间戳到达前4批的删除状态,并为这批重新开始。

    虽然您的想法可能被认为是可以理解的,但SSS框架并不支持所有内容,特别是您想要的内容(还没有)。

     类似资料:
    • 我正在用ApacheSpark编写一个连续应用程序。在结构化流媒体的情况下,我试图从增量表中读取数据,通过时间窗口在事件时间执行流媒体聚合,并以追加模式将结果写入增量表。我对文档的期望是,在append模式下,只有一个时间窗口的最终聚合才会写入接收器。这不是我的经历。相反,我在我的目标增量表中看到了如下记录,与我尝试过的许多流配置无关(windowDuration=5分钟,slideDuratio

    • 我正在从一个消息应用程序收集数据,我目前正在使用Flume,它每天发送大约5000万条记录 我希望使用Kafka,使用Spark Streaming从Kafka消费并将其持久化到hadoop并使用impala进行查询 我尝试的每种方法都有问题。。 方法1-将RDD另存为parquet,将外部配置单元parquet表指向parquet目录 问题是finalParquet.saveAsParquetF

    • 我是scala新手,尝试从元素数组中创建自定义模式,以读取基于新自定义模式的文件。 我正在从json文件中读取数组,并使用爆炸方法为列数组中的每个元素创建了一个数据框。 获得的输出为: 现在,对于上面列出的所有值,我尝试使用下面的代码动态创建val模式 上面的问题是,我能够在struct中获取数据类型,但我也希望仅为数据类型decimal获取(scale和preicion),其限制条件为max a

    • null 触发器是否支持一次追加模式? 这里有一个最小的应用程序来再现这个问题。要旨

    • 在带有postgres 9.6的Corda开源3.2上。我在交易中有300个义务状态,必须以原子方式转换为和现金状态作为输入以消除这些义务。 我在期间收到分页错误。可以放入事务中的状态有限制吗? 下面的堆栈跟踪

    • 我正在使用spark structured streaming(2.2.1)来消费来自Kafka(0.10)的主题。 我的检查点位置设置在外部HDFS目录上。在某些情况下,我希望重新启动流式应用程序,从一开始就消费数据。然而,即使我从HDFS目录中删除所有检查点数据并重新提交jar,Spark仍然能够找到我上次使用的偏移量并从那里恢复。偏移量还在哪里?我怀疑与Kafka消费者ID有关。但是,我无法