当前位置: 首页 > 知识库问答 >
问题:

KStream 窗口聚合

那谦
2023-03-14

尝试合并多个 Kafka 流,聚合

val streams = requestStreams.merge(successStreams).merge(errorStreams)
                .groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde()))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
                .aggregate({ null }, StreamAggregators.notificationMetricAggregator, Materialized.`as`<String, NotificationMetric, WindowStore<Bytes, ByteArray>>("ag-store")
                        .withValueSerde(serdesConfig.notificationMetricSerde()))
                .toStream()

streams.to(notificationStreamsConfig.metricsTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String::class.java, 10), serdesConfig.notificationMetricSerde()))

共有1个答案

史阳晖
2023-03-14

Kafka Streams默认使用连续更新处理模型。注意,聚合的结果是一个KTable。此结果表包含每个窗口的一行,每次处理新记录时,都会更新窗口(即表中的行)。

如果您调用Ktable#toStream(),您将获得表的更改日志流,其中包含表的每次更新的记录。

如果希望每个窗口只获得一个结果,可以使用< code>suppress()运算符来获得第二个< code>KTable,即< code>suppress()获取第一个< code>KTable的changelog流,并等待直到窗口关闭,然后只将最终结果插入其输出< code>KTable中。如果使用< code>suppress(),应该将上行窗口聚合的宽限期(默认值为24小时)设置为一个较低的值,即< code>TimeWindows.of(...).格蕾丝(...)。

有关更多详细信息,请查看此博客文章:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers

 类似资料:
  • 基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。

  • 我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?

  • 我们需要在一个非常大的窗口中执行kstream-kstream联接,在这个窗口中,左侧的一个刻度只会触发与右侧最新记录的联接,反之亦然。 这不是默认窗口的工作方式,因为中的window.fetch返回的是一个可以包含多条记录的迭代器。 特别是,我们注意到有一个属性设置为true,我们希望它设置为false。 我们如何为KStream KStream join定制存储实现?

  • 我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些

  • 我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi

  • 我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。 但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。 这张图片说明了情况:在此处输入图像描述 因此我需要知道警报之前是否有干预。 程序代码