当前位置: 首页 > 知识库问答 >
问题:

如何管理Kafka KStream到KStream窗口连接?

翟淇
2023-03-14

基于apache Kafka文档KStream-to-KStream联接始终是窗口联接,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流?

有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接?

在我的例子中,假设我有2个KStream、KStream1KStream2我希望能够加入10天的KStream1到30天的KStream2

共有1个答案

李新霁
2023-03-14

那是绝对有可能的。定义流运算符时,可以显式指定联接窗口大小。

KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 JoinWindows.of(joinWindowSizeMs)
);

// or if you want to use retention time

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 (JoinWindows)JoinWindows.of(joinWindowSizeMs)
                                         .until(windowRetentionTimeMs)
);

有关更多详细信息,请参见http://docs.confluent.io/current/streams/developer-guide.html#joining-streams。

滑动窗口基本上定义了一个附加的联接谓词。在类似SQL的语法中,这类似于:

SELECT * FROM stream1, stream2
WHERE
   stream1.key = stream2.key
   AND
   stream1.ts - before <= stream2.ts
   AND
   stream2.ts <= stream1.ts + after
 类似资料:
  • 尝试合并多个 Kafka 流,聚合

  • Window Manager 是一种特殊的 Xclient。 使用窗口管理器时,Xserver 并不直接与其它 Xclient 通信,而是通过 WM 中转,当一些消息被定义为 WM 指令时,它们会被拦截。例如 Alt+F4 关闭窗口、拖动标题栏…… 消息“打开链接 linuxtoy.org”,具体内容如下:     输入焦点在地址栏的范围内,“linuxtoy.org”,回车      Xserv

  • 窗口的内容: 在 RPG 中必须要使用很多的窗口。这些窗口如何管理呢,下面我们就来讲讲这个问题。 形如Window_Xxxx 的名字的脚本很多吧,但有两个尤其重要,那就是最上面的两个。 Window_Base,基本窗口,其父类是Window Window_Selectable,项目选择窗口,其父类是Window_Base 在 RGSS 中,Window类是预先编好的,它负责确定窗口边界和背景的绘画

  • 我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。 但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。 这张图片说明了情况:在此处输入图像描述 因此我需要知道警报之前是否有干预。 程序代码

  • Java语言lang.IllegalArgumentException:View=com。Android内部的政策impl。PhoneWindow$DecorView{41c2e378 V.E…..R……D 0,0-450161}未连接到窗口管理器 主要活动: 第二活动 CreateEntry异步onpostexecute 主要活动: 对于 它给出了错误。 过程:com。实例androidwith

  • 我们需要在一个非常大的窗口中执行kstream-kstream联接,在这个窗口中,左侧的一个刻度只会触发与右侧最新记录的联接,反之亦然。 这不是默认窗口的工作方式,因为中的window.fetch返回的是一个可以包含多条记录的迭代器。 特别是,我们注意到有一个属性设置为true,我们希望它设置为false。 我们如何为KStream KStream join定制存储实现?