当前位置: 首页 > 知识库问答 >
问题:

Kafka流窗口加入保留

薛修能
2023-03-14

我们正在使用kafka streams的windows join连接2个流,我们想知道:

  • 为什么KS要在内部主题上增加24小时?例如,我们有一个1小时的窗口,但内部主题保留25小时。我们可以将其配置为不添加这些24小时吗

[更新]

例如,我们创建JoinWindow如下:

JoinWindows.of(300000).before(600000).until(3600000)

虽然我可以看到内部主题(JOINTHIS和OUTEROTHER)是用

Configs:retention.ms=90000000

这是刚刚在我的机器上的一个空代理(使用confluent cli工具)上测试的

共有1个答案

汪鸿波
2023-03-14

我将部分回答我自己关于24小时的问题:事实上,留档在这里清楚地谈到了这一点:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#fault-tolerant-state-stores:

默认保留设置为Windows#maintainMs()1天。您可以通过指定StreamsConfig来覆盖此设置。StreamsConfig中的WINDOW\u STORE\u CHANGE\u LOG\u ADDITIONAL\u RETENTION\u MS\u CONFIG。

这是关于WINDOW\u STORE\u CHANGE\u LOG\u ADDITIONAL\u RETENTION\u MS\u CONFIG的Javadoc

 类似资料:
  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 我正在尝试加入apache flink中的两个流以获得一些结果。 我的项目的当前状态是,我正在获取twitter数据并将其映射到一个2元组中,其中保存用户的语言和定义时间窗口中的推文总和。我这样做是为了每种语言的推文数量和每种语言的转发。推文/转发聚合在其他进程中运行良好。 我现在想得到一个时间窗口内转发次数占所有推文次数的百分比。 因此我使用以下代码: 当我打印或时,输出似乎很好。我的问题是我从

  • 我有一个KStream KStream DSL如下所示: 阅读一些文章(例如Kafka流窗口) 但我想补充一点,这对我来说并不适用: Java编译器抛出以下错误: 老实说,我不明白。参数是正确的;虚拟现实类型是“历史”。 你知道我错过了什么吗? 这个windowedBy KTable的想法是让一个状态为一件“事情”保存所有事件一天。假设生成了一个新警报,我想将一天内“某物”的所有事件附加到警报上。

  • 我们正在使用Kafka流的会话窗口来聚合相关事件的到达。除了聚合之外,我们还使用API指定窗口的保留时间。流信息: 会话窗口(非活动时间)为1分钟,传递到的保留时间为2分钟。我们使用定制的来映射事件的时间。 示例: 事件:e1;事件时间:上午10:00:00;到达时间:下午2点(同一天) 事件:e2;事件时间:上午10:00:30;到达时间:下午2:10(同一天) 第二个事件的到达时间是e1到达后

  • 我正在对kafka流进行窗口聚合。它工作正常并进行正确的聚合。这是scala中的代码。是一个案例类。 当我试图从KSQLDB中读取这个内容时,当我创建一个关于这个主题的流时,我看到rowkey的值如下