我们正在使用kafka streams的windows join连接2个流,我们想知道:
[更新]
例如,我们创建JoinWindow如下:
JoinWindows.of(300000).before(600000).until(3600000)
虽然我可以看到内部主题(JOINTHIS和OUTEROTHER)是用
Configs:retention.ms=90000000
这是刚刚在我的机器上的一个空代理(使用confluent cli工具)上测试的
我将部分回答我自己关于24小时的问题:事实上,留档在这里清楚地谈到了这一点:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#fault-tolerant-state-stores:
默认保留设置为Windows#maintainMs()1天。您可以通过指定StreamsConfig来覆盖此设置。StreamsConfig中的WINDOW\u STORE\u CHANGE\u LOG\u ADDITIONAL\u RETENTION\u MS\u CONFIG。
这是关于WINDOW\u STORE\u CHANGE\u LOG\u ADDITIONAL\u RETENTION\u MS\u CONFIG的Javadoc
你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点
我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问
我正在尝试加入apache flink中的两个流以获得一些结果。 我的项目的当前状态是,我正在获取twitter数据并将其映射到一个2元组中,其中保存用户的语言和定义时间窗口中的推文总和。我这样做是为了每种语言的推文数量和每种语言的转发。推文/转发聚合在其他进程中运行良好。 我现在想得到一个时间窗口内转发次数占所有推文次数的百分比。 因此我使用以下代码: 当我打印或时,输出似乎很好。我的问题是我从
我有一个KStream KStream DSL如下所示: 阅读一些文章(例如Kafka流窗口) 但我想补充一点,这对我来说并不适用: Java编译器抛出以下错误: 老实说,我不明白。参数是正确的;虚拟现实类型是“历史”。 你知道我错过了什么吗? 这个windowedBy KTable的想法是让一个状态为一件“事情”保存所有事件一天。假设生成了一个新警报,我想将一天内“某物”的所有事件附加到警报上。
我们正在使用Kafka流的会话窗口来聚合相关事件的到达。除了聚合之外,我们还使用API指定窗口的保留时间。流信息: 会话窗口(非活动时间)为1分钟,传递到的保留时间为2分钟。我们使用定制的来映射事件的时间。 示例: 事件:e1;事件时间:上午10:00:00;到达时间:下午2点(同一天) 事件:e2;事件时间:上午10:00:30;到达时间:下午2:10(同一天) 第二个事件的到达时间是e1到达后
我正在对kafka流进行窗口聚合。它工作正常并进行正确的聚合。这是scala中的代码。是一个案例类。 当我试图从KSQLDB中读取这个内容时,当我创建一个关于这个主题的流时,我看到rowkey的值如下