当前位置: 首页 > 知识库问答 >
问题:

在KStream-KStream联接中自定义窗口存储实现

周鸿运
2023-03-14

我们需要在一个非常大的窗口中执行kstream-kstream联接,在这个窗口中,左侧的一个刻度只会触发与右侧最新记录的联接,反之亦然。

这不是默认窗口的工作方式,因为KStreamKStreamJoinProcessor中的window.fetch返回的WindowstReterator 是一个可以包含多条记录的迭代器。

特别是,我们注意到rockdbwindowstore有一个retainduplicates属性设置为true,我们希望它设置为false。

我们如何为KStream KStream join定制存储实现?

共有1个答案

姚晋
2023-03-14

最简单的方法可能是将代码复制到一个具有新名称的类中,并相应地更改逻辑?另一种可能是将这两个流转换为ktables并执行表-表连接(您需要禁用两个输入ktables的缓存)。

但是请注意,对于您想要的连接类型,很难正确处理无序的数据

 类似资料:
  • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

  • 基于apache Kafka文档,我的问题是如何控制窗口的大小?保持主题上的数据的大小是一样的吗?或者例如,我们可以将数据保留一个月,但只加入过去一周的流? 有没有什么好的例子来展示一个窗口的KStream-to-kStream窗口连接? 在我的例子中,假设我有2个KStream、和我希望能够加入10天的到30天的。

  • 尝试合并多个 Kafka 流,聚合

  • 我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。 但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。 这张图片说明了情况:在此处输入图像描述 因此我需要知道警报之前是否有干预。 程序代码

  • 我试图用GlobalKTable连接KStream,连接不完全在键上。 我想通过empIdOverLoginUserId的值通过employeesDetails的键将empIdOverLoginUserId与employeesDetails连接

  • 我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所