我有一个场景,我想处理来自kafka主题的数据。我有这个特定的java代码来从kafka主题中读取数据作为流。
Dataset<Row> streamObjs = sparkSession.readStream().format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", streamTopic)
.option("failOnDataLoss", false).load();
我将其转换为 String,定义架构,然后尝试使用水印(用于后期数据)和窗口(用于分组和聚合),最后输出到 kafka sink。
Dataset<Row> selectExprImporter = streamObjs.selectExpr("CAST(value AS STRING)");
StructType streamSchema = new StructType().add("id", DataTypes.StringType)
.add("timestamp", DataTypes.LongType)
.add("values", new MapType(DataTypes.StringType, DataTypes.DoubleType, false));
Dataset<Row> selectValueImporter = selectExprImporter
.select(functions.from_json(new Column("value"), streamSchema ).alias("data"));
.
.
(More transformations/operations)
.
.
Dataset<Row> aggCount_15min = streamData.withWatermark("timestamp", "2 minute")
.withColumn("frequency", functions.lit(15))
.groupBy(new Column("id"), new Column("frequency"),
functions.window(new Column("timestamp"), "15 minute").as("time_range"))
.agg(functions.mean("value").as("mean_value"), functions.sum("value").as("sum"),
functions.count(functions.lit(1)).as("number_of_values"))
.filter("mean_value > 35").orderBy("id", "frequency", "time_range");
aggCount_15min.selectExpr("to_json(struct(*)) AS value").writeStream()
.outputMode(OutputMode.Complete()).format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", outputTopic).option("checkpointLocation", checkpointLocation).start().awaitTermination();
问题
>
我是否正确理解在 kafka 接收器中使用完整输出模式时,中间状态将永远增加,直到我出现内存不足异常?
此外,完整输出模式的理想用例是什么?仅当中间数据/状态没有增加时才使用它?
在我的例子中需要完整输出模式,因为我想使用orderBy子句。有没有什么方法可以让我强迫spark在每30分钟后放弃它的状态,重新处理新的数据?
有没有更好的方法不使用“完整输出”模式,但仍能获得期望的结果?我应该使用spark结构化流媒体之外的其他内容吗?
所需的结果是根据上面的查询聚合和分组数据,然后在创建第一个批次后,删除所有状态并为下一批重新开始。在这里,批处理可以是上次处理时间戳的函数。就像说删除所有状态并在当前时间戳从第一个收到的时间戳超过 20 分钟时重新开始,或者更好的是窗口时间(在本例中为 15 分钟)的函数,例如当处理了 4 批 15 分钟的窗口时,第 5 批的时间戳到达前 4 批的下降状态并重新开始此批次。
这个问题问了很多事情,而较少关注Spark Structured Streaming(SSS)的实际作用。然后回答您的编号问题、标题问题和非编号问题:
A.标题问题:
并非如此,但Complete模式仅存储聚合,因此并非存储所有数据,而是允许基于增量添加数据进行重新计算的状态。我发现手册在描述方面有误导性,但它可能是我的错。但否则你会得到这个错误:
org.apache.spark.sql.AnalysisException:当流式数据帧/数据集上没有流式聚合时,不支持完全输出模式
Kafka水槽不在这里。中间状态是Spark Structured Streaming需要存储的内容。它存储聚合并丢弃较新的数据。但最终你会因为这个或我怀疑的其他错误而得到OOM。
对于接收到的所有数据的聚合。你问题的第二部分不符合逻辑,因此我无法回答。状态通常会随着时间的推移而增加。
不,没有。即使试着优雅地停下来也不是一个主意,然后重新开始,因为时间还不到15分钟。而且,这是反对SSS方法的。从手册中可以看出:只有在聚合后和完全输出模式下,流式数据集才支持排序操作。您不能随意删除状态,再次聚合讨论。
不,因为您有许多当前实现无法满足的要求。除非您放弃订单,并在附加模式下执行非重叠窗口操作(15,15),如果内存正常工作,请使用微小的水印。随后,您将依靠下游处理进行排序,因为不支持排序。
最后一个整体问题:所需的结果是根据上面的查询聚合和分组数据,然后当第一批创建完成时,删除所有状态并为下一批重新开始。这里的批次可以是上次处理时间戳的函数。比如当当前时间戳从第一个接收到的时间戳超过20分钟时,删除所有状态并重新开始,或者更好,如果它是窗口时间的函数(在本例中为15分钟),比如当4批15分钟的窗口处理完成时,第五批的时间戳到达前4批的删除状态,并为这批重新开始。
虽然您的想法可能被认为是可以理解的,但SSS框架并不支持所有内容,特别是您想要的内容(还没有)。
我正在用ApacheSpark编写一个连续应用程序。在结构化流媒体的情况下,我试图从增量表中读取数据,通过时间窗口在事件时间执行流媒体聚合,并以追加模式将结果写入增量表。我对文档的期望是,在append模式下,只有一个时间窗口的最终聚合才会写入接收器。这不是我的经历。相反,我在我的目标增量表中看到了如下记录,与我尝试过的许多流配置无关(windowDuration=5分钟,slideDuratio
我正在从一个消息应用程序收集数据,我目前正在使用Flume,它每天发送大约5000万条记录 我希望使用Kafka,使用Spark Streaming从Kafka消费并将其持久化到hadoop并使用impala进行查询 我尝试的每种方法都有问题。。 方法1-将RDD另存为parquet,将外部配置单元parquet表指向parquet目录 问题是finalParquet.saveAsParquetF
我是scala新手,尝试从元素数组中创建自定义模式,以读取基于新自定义模式的文件。 我正在从json文件中读取数组,并使用爆炸方法为列数组中的每个元素创建了一个数据框。 获得的输出为: 现在,对于上面列出的所有值,我尝试使用下面的代码动态创建val模式 上面的问题是,我能够在struct中获取数据类型,但我也希望仅为数据类型decimal获取(scale和preicion),其限制条件为max a
null 触发器是否支持一次追加模式? 这里有一个最小的应用程序来再现这个问题。要旨
在带有postgres 9.6的Corda开源3.2上。我在交易中有300个义务状态,必须以原子方式转换为和现金状态作为输入以消除这些义务。 我在期间收到分页错误。可以放入事务中的状态有限制吗? 下面的堆栈跟踪
我正在使用spark structured streaming(2.2.1)来消费来自Kafka(0.10)的主题。 我的检查点位置设置在外部HDFS目录上。在某些情况下,我希望重新启动流式应用程序,从一开始就消费数据。然而,即使我从HDFS目录中删除所有检查点数据并重新提交jar,Spark仍然能够找到我上次使用的偏移量并从那里恢复。偏移量还在哪里?我怀疑与Kafka消费者ID有关。但是,我无法