当前位置: 首页 > 知识库问答 >
问题:

KStream加入重试/延迟的KStream

雍焱
2023-03-14

我们正在尝试实现下面描述的用例,我们有我们希望克服的实现问题,

用例,

我们试图通过匹配两个流的消息中的键(JSON)来实现两个Kafka主题之间的KStream连接。此外,我们还应该维护消息序列,因为它是从源代码到达KStream的。

场景是,如果匹配的键还没有到达任何一个流,我们应该停止或重试join,直到预期的键到达其他主题。我们想把不匹配的记录放回KStream,但在这种情况下,序列没有保证。

问题1:如何在其他主题中停止或保持join直到预期的键到达。例如,KTable有密钥100,但是KStream还没有收到密钥100,那么我们应该重试Join或者保持KStream直到密钥100到达。

在KTable和KStream之间的连接上,Java更好,就像我们做的POC一样

KTable<String, String> leftStream = builder.table("stream1");
KStream<String, String> rightStream = builder.stream("stream2");
KStream<String, String> outstream = rightStream.leftJoin(leftStream, (orig_msg, description) -> {
         String new_msg = "";
            if (description != null) {
                  new_msg = orig_msg+"-->Matched--"+description;
            }else {
                new_msg = orig_msg+"-->UnMatched<--"+description;
            }
                return new_msg;
     });

共有1个答案

钱京
2023-03-14

在示例中,您正在执行KStream-to-KTable的左联接。Kafka Streams join语义指定:(a)只有到达KStream的数据才会触发join输出,并且(b)如果当新的KStream事件到达时,KTable(join的右侧)中没有匹配的数据,那么仍然会立即产生一个join输出,但表端数据的输出为null(即,KTable端不会等待数据到达)。

问题1:如何在其他主题中停止或保持join直到预期的键到达。例如,KTable有密钥100,但是KStream还没有收到密钥100,那么我们应该重试Join或者保持KStream直到密钥100到达。

首先,不能使用内置的Kafka Streams功能停止或保留连接。

其次,您提供的特定示例在实践中不会发生,因为(参见上文)到达KTable的事件不会产生连接输出。只有当事件到达KStream时,才会(a)查找KTable和(b)生成连接输出,而不管(a)的结果如何。

但是在KStream-KTable左联接中可能发生的是相反的例子:KStream有密钥100,但是KTable尚未接收密钥100。这个怎么处理?见下文。

问题2:有没有办法在KStream(延迟的KStream)中放入延迟或间隔来接收有延迟时间或间隔的消息。

有一个示例应用程序演示了这一点,恰巧与上面的用例类似:请参阅https://github.com/confluentinc/kafka-streams-examples(直接链接到ConfluentV5.2.1/Apache Kafka 2.2的CustomStreamTableJoin示例)的CustomStreamTableJoin

 类似资料:
  • 描述 (Description) 延迟加载可应用于图像,背景图像和淡入效果,如下所述 - 对于图像 要在图像上使用延迟加载,请按照给定的步骤进行操作 - 使用data-src属性而不是src属性来指定图像源。 将类lazy添加到图像。 <div class = "page-content"> ... <img data-src = "image_path.jpg" class = "l

  • 问题内容: 如果这是完全相同的内容,请纠正我,我知道这个话题经常被讨论,但是找不到确切的答案。 问题: 在MVC Web应用程序中处理Hibernate对象的最佳实用解决方案是什么? 细节: 我正在使用Hibernate,并希望在可能的情况下利用延迟加载。 我正在使用MVC风格的Webapp。 我讨厌获得延迟加载初始化异常。 我讨厌不得不在事务之间重新连接Hibernate对象。 选项: 渴望装载

  • 我正在使用Microsoft Azure ServiceBus对队列消息进行排队,并使用WCF对订阅进行排队。我正在尝试实现重试逻辑。我使用Peak/Lock查看消息,然后必须对消息进行一些本地处理。如果处理失败,我将解锁消息,以便再次尝试处理它。问题是我需要在处理尝试之间建立一个延迟。当前,它被弹出回队列,然后几乎立即被处理。两次尝试之间需要大约2分钟。

  • 问题内容: 我在JPA实体中的延迟加载属性有问题。我读过许多类似的问题,但它们与spring或hibernate有关,并且他们的后代不适用或没有帮助。 该应用程序是在Wildfly应用程序服务器上运行的JEE和JPA2.1。有两个实体,DAO会话bean和servlet将它们放在一起: 当我运行此代码时,它失败并显示: 我对WebLogic / JPA1使用了非常相似的模式,并且运行平稳。任何的想

  • 问题内容: 我想知道在node.js中使用是否等效于延迟加载? 例如,如果我有一个函数需要代码中其他任何地方都不需要的特定node.js包,那么我最好在该函数内部使用它,以便仅在调用该函数时才包含所需的包。 我还不确定是否会由于缺乏对node.js架构的了解而在性能方面有所改善?我想它每次与服务器的连接都会使用更少的内存。但是,当它必须读取程序包时,它会增加磁盘的I / O吗,还是将其添加到内存中

  • 这是从这里开始的后续行动。 我正在实现一个表,它将数据异步加载到表单元格中。问题是,表单元格有时无法正确更新。有时它会以某种方式“挂起”并永远显示“加载...”。实际值只有在我在表中滚动一点时才会更新。 要复制,请在表格中快速向下滚动应用程序。某些单元格不会显示“延迟加载”值,而是显示占位符字符串。 延迟加载属性如下所示: } 应用程序如下所示: 完整的可运行代码可以在这里找到。