当前位置: 首页 > 知识库问答 >
问题:

如何在spring cloud stream内置的Kafka streaming (kstreams)应用中实现延迟重试机制?

上官和惬
2023-03-14

我有一个场景,比如。

有4个主题处理主题,异常主题,重试主题和拒绝主题。我有一个spring cloud stream应用程序,它有一个使用Kstream的处理器。该处理器从异常主题中读取消息,并基于每个消息中可用的标志,为重试主题和拒绝主题创建两个kstream分支。现在需要做的是,retry主题中出现的任何消息都必须等待一段特定的时间,然后才能将其推回处理主题。有没有人能帮我在spring cloud stream的一个kafka stream应用里做的最好的设计或者解决方案是什么?可以用flux和Mono的异步机制来设计吗?任何资源或指导将是一个很大的帮助。谢谢

共有1个答案

太叔鸿博
2023-03-14

我们不能在Spring Cloud Stream Kafka Streams应用程序中混合反应类型。基本上,您只能将Kafka流类型(如< code>KStream或< code>KTable)作为输入/输出绑定。如果您想在将它发送到处理主题之前引入一个延迟,为什么不能使用类似< code > ScheduleExecutorService 的东西,然后只在初始延迟之后调用服务呢?下面是一个潜在解决方案的示例伪代码。

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
...
...

.branch(
   (k, v) -> {
     if (reject flag found) {
       return true;
     }
   }
   (k, v) -> executorService.schedule(() -> true, initialDelay, TimeUnit.SECONDS));
       

在第一个分支中,我们检查记录是否绑定到拒绝状态,如果是,立即将其发送到拒绝主题。否则,它是针对重试主题的,在返回布尔标志之前延迟某个值(我猜这是在您的应用程序中外部配置的)。延迟完成后,布尔值< code>true将返回,到达分支的记录将被发送到重试主题。

请记住,我还没有尝试过这段代码,所以请注意第二个过滤器中的任何竞争条件,在这种情况下,我们以异步方式将事情交给调度器,但该线程上下文仍应保留当前的<code>(k,v)</code>对。请尝试使用一些测试数据,看看它是否符合您的要求。

 类似资料:
  • 我有一个基于Spring boot的KStreams应用程序,我在其中加入跨多个主题的数据。当一个主题出现延迟时,处理情况的最佳实践是什么?我读过一些链接,比如如何管理Kafka KStream到KStream窗口连接?和其他人。 下面是我的示例代码(Spring Boot应用程序),用于为两个主题--雇员和财务--生成模拟数据。下面是员工主题的代码: 对于金融主题也是如此:

  • 问题内容: 我想在更改背景之间的命令之间设置延迟。我尝试使用线程计时器,并尝试使用运行和捕获。但这不起作用。我试过了 但这只是变成黑色。 问题答案: 试试这个代码:

  • 我一直在学习spring Webflux和reactive programming,并陷入了一个问题,我试图解决的重试逻辑使用spring WebClient。我已经创建了一个客户机,并成功地调用了一个外部Web服务GETendpoint,该endpoint返回一些JSON数据。 当外部服务以状态响应时,响应包括标头,该标头具有一个值,指示在重试请求之前应等待多长时间。我想在spring WebF

  • 本文向大家介绍kafka如何实现延迟队列?相关面试题,主要包含被问及kafka如何实现延迟队列?时的应答技巧和注意事项,需要的朋友参考一下 Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不

  • 问题内容: 在早期版本的Swift中,可以使用以下代码创建延迟: 但现在,斯威夫特3时,Xcode自动改变6个不同的东西,但随后出现以下错误:“无法转换到预期值又名”。 在Swift 3中运行一系列代码之前,如何创建延迟? 问题答案: 经过大量研究,我终于找到了答案。 这会在Swift 3和Swift 4中创建所需的“等待”效果。

  • 本文向大家介绍如何理解Hibernate的延迟加载机制?在实际应用中,延迟加载与Session关闭的矛盾是如何处理的?相关面试题,主要包含被问及如何理解Hibernate的延迟加载机制?在实际应用中,延迟加载与Session关闭的矛盾是如何处理的?时的应答技巧和注意事项,需要的朋友参考一下 考察点:hibernate框架 参考回答: 延迟加载就是并不是在读取的时候就把数据加载进来,而是等到使用时再