当前位置: 首页 > 知识库问答 >
问题:

使用RSocket和Project Reactor实现202接受后重试行为

夏季萌
2023-03-14

我正在实现一个典型的用例,其中客户端请求异步生成的资源。因此,立即生成并返回resourceID:

1. CLIENT ---(POST /request-resource)--->  SERVER
2. SERVER (Generates resID, launches async process) ---(202 Accepted - resID)---> CLIENT

此时,服务器中有一个后台任务,该任务将最终生成一个结果,并将其存储在与RESID相关联的数据库中。客户端将定期请求资源,重试直到资源可用:

3. CLIENT ---(/resource/resID)--->  SERVER (checks in Postgres using reactive driver)
4. SERVER ---(404 - Retry-After 5)---> CLIENT
5. CLIENT ---(/resource/resID)--->  SERVER (checks in Postgres using reactive driver)
6. SERVER ---(200 - JSON Payload)---> CLIENT

我认为RSocket是一个完美的选择,以避免在资源可用之前进行无休止的客户端重试(步骤3.on)。

哪种交互模型会更适合这个问题,我如何实现它?

考虑存储库如下:ResourceRepository.mono getResult(String resID)

如果我选择了请求/响应交互模型,那么我的情况与前面一样。除非有办法让一个单声道重试直到有结果。这可能吗?

对于Request/Stream,我可以使用response.status=processing返回类似Flux 的结果,直到对Postgre的查询返回一个结果,然后Flux将有一个response.status=ok的元素,并且Flux将完成。需要一个最大的时间来完成在配置的周期内没有结果的通量。既然如此,我怎么能这样做呢?

我需要创建一个Flux,该Flux周期性地发出(具有最大周期超时),在存储库返回空Mono时有一个没有结果的元素,或者在te存储库有它时有一个实际值,完成Flux。

共有1个答案

陆雅志
2023-03-14

此问题的解决方案使用RSocket和RequestResponse交互模型,该交互模型等待资源在DB中可用。关键是使用repeatwhenempty运算符:

    @MessageMapping("request-resource")
    fun getResourceWebSocket(resourceRequest: ResourceRequest): Mono<Resource> {
        return resourceService.sendResourceRequestProcessing(resourceRequest)
    }

    override fun sendResourceRequestMessage(resourceRequest: ResourceRequest): Mono<Resource> {
        val resourceId = randomUUID().toString()
        return Mono.fromCallable {
            sendKafkaResourceProcessingRequestMessage(resourceId, resourceRequest)
        }.then(poolResourceResponse(resourceId))
    }
    private fun poolResourceResponse(resourceId: String): Mono<Resource> {
        return resourceRepository.findByResourceId(resourceId)
                .repeatWhenEmpty(30) { longFlux ->
                    longFlux.delayElements(Duration.ofSeconds(1))
                            .doOnNext { logger.info("Repeating {}", it) }
                }
    }
 类似资料:
  • 对于,您可以指定订阅时发生的自定义操作。例如

  • 我有简单的Spring启动RSocket服务 连接2个Spring服务很容易,但我的客户端应用程序没有Spring,我的客户端应该在RSocket java中 我很难理解如何将(路由,如Spring RocketRequest ester)消息发送到该特定通道。 客户端代码应为: 有可能订阅Spring频道吗?

  • 问题内容: 我很好奇如何正确使用和。 views / orders / new.html.erb views / order_details / _details.html.erb controllers / orders_controller.rb (我很确定这是错误的……这里的任何帮助将不胜感激) 模型/ order.rb 我能够使部分音色演奏得更好的唯一方法是,如果我实际上叫as 。但这根本

  • RSocket 就是为服务而设计的。它是面向连接的、消息驱动的协议,内置了应用程序级的流控制。它在浏览器中和在服务器上一样工作。事实上,Web 浏览器可以服务于后端微服务的流量。它也是二进制的。它可以同样好地处理文本和二进制数据,并且可以分解有效工作负载。它将应用程序中的所有交互建模为网络原语。这意味着,你可以流化数据或执行发布 / 订阅,而无需设置应用程序队列。 目前提供 Java、JavaSc

  • 试图实现硒cucumber测试而不是JUnit。 我的疑问是 testng中@Runwith(Cucumber.class)的替代选项是什么

  • HTTP 202 Accepted响应状态码指示该请求已被接收但尚未起作用。它是非承诺的,这意味着HTTP中没有办法稍后发送指示处理请求结果的异步响应。它适用于其他进程或服务器处理请求或批处理的情况。 状态 202 Accepted 规范 规范 标题 RFC 7231,第6.3.3节:202接受 超文本传输协议(HTTP / 1.1):语义和内容