当前位置: 首页 > 知识库问答 >
问题:

Kafka流-抑制到窗口结束(不关闭)

关玄裳
2023-03-14

我正在窗口流上执行聚合,希望抑制早期聚合结果。我所说的早期结果是指在窗口结束前计算的结果,而不是那些在宽限期内发生的结果。因此,我想用时间戳抑制所有聚合结果

最小Kafka流拓扑示例:

new StreamsBuilder()
        .stream("my-topic")
        .windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
        .reduce(myReducer)
        .suppress( /* searched for*/ )
        .toStream();

因此,被抑制。直到Windows关闭(…)不是我的选择,因为我必须等到宽限期到期,这可能会很长。

根据KIP-328,使用Suppressed可以准确地获得所需的行为。非实时限制(持续时间为0,…) as(引用KIP的描述):

a、 要等待多长时间才能发出更多更新。这是一个时间量,从事件时间(对于常规KTables)或窗口端(对于带窗口的KTables)测量,以缓冲每个键,然后向下游发射。

然而,Kafka Streams JavaDoc以及相应的实现意味着情况并非如此,并且时间限制在接收每个(windows-)键的第一个记录时开始倒计时,而不是在窗口结束时。

我很高兴能澄清这一点,并支持如何实现期望的行为。


共有1个答案

鲁博赡
2023-03-14

KIP描述不正确(我相应地更新了wiki页面)。请注意,再往下看,基普说:

速率限制更新

假设我们希望将KTable的更新速率降低到大约每30个密钥更新一次。我们不想为此使用太多内存,而且我们认为在任何时候都不会有超过1000个密钥的更新。

table
  .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000)))
  .toStream(); // etc.

因此,使用untilTimeLimit是用来在一个固定的时间间隔内发射的。对于窗口聚合,时间间隔计时器将在窗口开始时开始-您仍然可以将等待周期设置为窗口大小,以不获得任何“早期”更新,但是您不会在窗口结束后看到每个更新,而是只能在“窗口大小间隔”中看到更新。如果宽限期真的很长,这可能还不够好吗?

你描述的用例目前不支持,但我认为这是一个非常有趣和有用的用例。也许你可以创建一个功能请求票?

 类似资料:
  • 我已经能够创建一个“会话开始信号”流,如本答案所述。 是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 下面给出的kafka producer程序不是通过Eclipse在Windows中运行的,而是在Unix平台上运行的(即,当我在承载kafka代理的Unix中运行它时,它工作正常)。windows不支持Kafka制作人吗?但是,我可以从windows计算机ping ip地址。请帮忙。 这是我得到的异常错误。 log4j:WARN找不到记录器的附加程序(kafka.utils.VerifiableP

  • 当USB设备连接到Android平板电脑时,会出现一个弹出窗口,要求用户许可。我想抑制它,因为客户端不想要它。我应该怎么做? 在代码中: 调用以授予 USB 设备临时访问权限。 这会弹出一个弹出窗口。默认情况下,如何禁止弹出窗口或授予用户访问权限?

  • 我们正在使用kafka streams的windows join连接2个流,我们想知道: 为什么KS要在内部主题上增加24小时?例如,我们有一个1小时的窗口,但内部主题保留25小时。我们可以将其配置为不添加这些24小时吗 [更新] 例如,我们创建JoinWindow如下: 虽然我可以看到内部主题(JOINTHIS和OUTEROTHER)是用 这是刚刚在我的机器上的一个空代理(使用confluent