当前位置: 首页 > 知识库问答 >
问题:

如何发送时间窗口KTable的最终kafka-streams聚合结果?

充星腾
2023-03-14

我想做的是:

    < li >消费数字主题中的记录(Long的) < li >聚合(计数)每个5秒窗口的值 < li >将最终聚合结果发送到另一个主题

我的代码如下:

KStream<String, Long> longs = builder.stream(
            Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
            longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
          .to("long-counts");

看起来一切都像预期的那样工作,但聚合被发送到每个传入记录的目标主题。我的问题是如何仅发送每个窗口的最终聚合结果?

暂时还没有答案

 类似资料:
  • 我们正在使用Kafka流的会话窗口来聚合相关事件的到达。除了聚合之外,我们还使用API指定窗口的保留时间。流信息: 会话窗口(非活动时间)为1分钟,传递到的保留时间为2分钟。我们使用定制的来映射事件的时间。 示例: 事件:e1;事件时间:上午10:00:00;到达时间:下午2点(同一天) 事件:e2;事件时间:上午10:00:30;到达时间:下午2:10(同一天) 第二个事件的到达时间是e1到达后

  • 我正在编写一个Kafka Streams应用程序。它执行以下步骤“1)消耗输入数据2)在1小时窗口内根据新密钥消重记录3)重新选择密钥4)在1小时窗口内计数密钥5)发送到下游。 我是Kafka流的新手。我的理解是,为了将窗口保持为1小时,我将 也设置为1小时。这是正确的做法吗? 一旦我部署了我的应用程序与真实的流量,它似乎应用程序不断发送消息,而我认为它每小时只会发送一堆消息? 感谢任何帮助!!

  • 我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的

  • 我能做到写作和阅读的中间主题: 有没有简单的方法从中获取?这是我第一个使用Kafka Streams的应用程序,所以我可能错过了一些明显的东西。

  • 我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象: 我想对这个KTable做一些聚合,基本上保留每个的记录数。所需的KTable数据如下所示: 假设属于上面的组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示: 我希望得到的聚合KTable结果反映这一点。例如。 我可能过度概括了这里