当前位置: 首页 > 知识库问答 >
问题:

Kafka流聚合器-如何设置时间等待在聚合之前吐出消息?

谭勇
2023-03-14

(0_10,..)(0_11,..)-->(0,[10])(0,[10,11])

我想知道如何控制聚合时间窗口,这样它就不会为每个传入的消息吐出一条消息,而是等待并聚合其中的一些消息。想象一下流应用程序使用这些消息:

  • (0_10,..)
  • (1_11,..)
  • (0_13,..)
    null

我的代码很简单

builder
    .table(keySerde, valueSerde, sourceTopic)
    .groupBy(StreamBuilder::groupByMapper)
    .aggregate(
        StreamBuilder::aggregateInitializer,
        StreamBuilder::aggregateAdder,
        StreamBuilder::aggregateSubtractor)
    .to(...);

目前,它有时会批量聚合,但不确定如何调整它:

{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}

共有1个答案

邓德厚
2023-03-14

我想知道如何控制聚合时间窗口,这样它就不会为每个传入的消息吐出一条消息,而是等待并聚合其中的一些消息。

这是不可能的Kafka流的窗口。一般来说,Kafka Streams窗口不会“关闭”或“结束”,因为一旦窗口“关闭”,你就不能告诉它产生最终结果(没有这样的概念)。这是为了适应迟来的结果。当消息到达聚合窗口时,您将看到更新。Kafka流输出更新的频率取决于缓存(见下文)。有关更多信息,请参见:如何发送时间窗口Ktable的最终kafka-streams聚合结果?

目前,它有时会批量聚合,但不确定如何调整它:

 类似资料:
  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我有一个KStream,其中包含从主题到1的数据,如下所示: 和KTable,构造如下: 稍后,主题To2中出现以下消息: 现在,我希望我的KTable能够反映这些变化,并且看起来像这样: 但看起来是这样的: 我想我缩小了范围:显然聚合的只在第一次调用--之后聚合总是接收作为最后一个参数,例如。 其中,在第一次调用(通过初始值设定项创建)时为,但在第二次调用时为。 有什么想法吗? 编辑2 编辑3

  • 我有一个条目主题,其中我从传感器接收数据。通常,我收到的数据如下所示: 为了稍后在拓扑中进行一些计算,我需要构建一个映射,其中包含从每个捕获者接收到的所有最后值。 关键字:项目id值:{ 为了做到这一点,我在传感器主题和聚合主题之间进行了连接,连接的结果是聚合主题中的post。 ------ 传感器(KStream)-| -------聚合(KTable)---| 更新:以下是实现这种连接的jav

  • 我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的

  • 我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象: 我想对这个KTable做一些聚合,基本上保留每个的记录数。所需的KTable数据如下所示: 假设属于上面的组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示: 我希望得到的聚合KTable结果反映这一点。例如。 我可能过度概括了这里

  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该