尝试合并多个 Kafka 流,聚合
val streams = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.aggregate({ null }, StreamAggregators.notificationMetricAggregator, Materialized.`as`<String, NotificationMetric, WindowStore<Bytes, ByteArray>>("ag-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
.toStream()
streams.to(notificationStreamsConfig.metricsTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String::class.java, 10), serdesConfig.notificationMetricSerde()))
Kafka Streams默认使用连续更新处理模型。注意,聚合的结果是一个KTable
。此结果表包含每个窗口的一行,每次处理新记录时,都会更新窗口(即表中的行)。
如果您调用Ktable#toStream()
,您将获得表的更改日志流,其中包含表的每次更新的记录。
如果希望每个窗口只获得一个结果,可以使用< code>suppress()运算符来获得第二个< code>KTable,即< code>suppress()获取第一个< code>KTable的changelog流,并等待直到窗口关闭,然后只将最终结果插入其输出< code>KTable中。如果使用< code>suppress(),应该将上行窗口聚合的宽限期(默认值为24小时)设置为一个较低的值,即< code>TimeWindows.of(...).格蕾丝(...)。
有关更多详细信息,请查看此博客文章:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。
我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?
我们需要在一个非常大的窗口中执行kstream-kstream联接,在这个窗口中,左侧的一个刻度只会触发与右侧最新记录的联接,反之亦然。 这不是默认窗口的工作方式,因为中的window.fetch返回的是一个可以包含多条记录的迭代器。 特别是,我们注意到有一个属性设置为true,我们希望它设置为false。 我们如何为KStream KStream join定制存储实现?
我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些
我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi
我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。 但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。 这张图片说明了情况:在此处输入图像描述 因此我需要知道警报之前是否有干预。 程序代码