当前位置: 首页 > 知识库问答 >
问题:

block()/blockfirst()/blocklast()在exchange()之后调用bodyToMono时出现阻塞错误

空浩淼
2023-03-14

我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到了错误,api将返回成功,但是在生成文件时有一个DTO详细说明了错误,而不是文件本身。这是在使用一个非常旧和设计差的api,所以请原谅使用post和api设计。

api调用(exchange())的响应是ClientResponse。在这里,我可以使用bodyToMono转换为ByteArrayResource,该ByteArrayResource可以流式传输到文件,或者,如果在创建文件时出现错误,那么我也可以使用bodyToMono转换为DTO。但是,我似乎不能这样做,也不能依赖于ClientResponse头的内容。

在运行时,我得到一个IllegalStateException

block()/blockfirst()/blocklast()是阻塞的,线程反应器-http-client-epoll-12不支持这一点

我认为我的问题是我不能在同一个函数链中调用block()两次。

我的代码片段是这样的:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> {
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                }
                else {
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                }
            }
        )
        .block();

这可能吗?

共有1个答案

微生鸿轩
2023-03-14

首先,一些有助于您理解解决此用例的代码片段的东西。

  1. 不应在返回反应类型的方法中调用阻塞方法;您将阻塞应用程序中为数不多的一个线程,这对应用程序非常不利
  2. 无论如何,从Reactor 3.2开始,在反应管道中阻塞会引发错误
  3. 按照注释中的建议调用subscribe也不是一个好主意。这或多或少类似于在单独的线程中作为任务启动该作业。完成后,您将得到一个回调(subscribe方法可以被赋予lambdas),但实际上您正在将当前管道与该任务解耦。在这种情况下,在您有机会读取完整的响应正文并将其写入文件之前,可以关闭客户端HTTP响应并清理资源
  4. 如果您不想在内存中缓冲整个响应,Spring提供databuffer(考虑可以池化的ByteBuffer实例)。
  5. 如果正在实现的方法本身是阻塞的(例如返回void),则可以调用block,例如在测试用例中。

下面是一个代码片段,您可以使用它来完成此操作:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> {
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            }
            else {
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            }
        });
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

正如您所看到的,我们没有阻塞任何地方,处理I/O的方法返回mono ,这是done(error)回调的反应性等价物,它在事情完成和是否发生错误时发出信号。

因为我不确定createErrorFile方法应该做什么,所以我提供了createsPreadSheet的示例,它只是将正文字节写入文件。请注意,由于Databuffer可能被回收/池化,我们需要在完成后释放它们。

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
    try {
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
    } catch (IOException exc) {
        return Mono.error(exc);
    }
}

通过此实现,您的应用程序将在给定时间在内存中保存几个databuffer实例(由于性能原因,反应运算符是预取值),并将以反应方式写入字节。

 类似资料:
  • 问题内容: 我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,则api返回成功,但是DTO在生成文件而不是文件本身时会详细说明错误。这使用的是非常古老且设计不佳的api,因此请原谅post和api设计的使用。 api调用(exchange())的响应是ClientResponse。从这里,我可以使用bodyToMono转换为ByteArrayResourc

  • 我在Spring Webflow中执行阻塞操作时遇到了一个小问题。我检索文章文档列表,并从文章文档列表中,我想更新另一个对象。 当我执行以下操作时,有时它会工作,有时会抛出“block()/block First()/block Last()被阻塞,这在线程reactor-超文本传输协议-nio-2中是不支持的”。你能建议如何修复吗?我真的不想让它阻塞,但不知道如何继续。stackoverflow

  • 本文向大家介绍node.js回调函数之阻塞调用与非阻塞调用,包括了node.js回调函数之阻塞调用与非阻塞调用的使用技巧和注意事项,需要的朋友参考一下 首先,node.js作为javascript运行平台,它采用了事件驱动和异步编程的方式,通过事件注册和异步函数,开发人员可以提高资源利用率,服务器的性能也能得到改善。其次,对于前端人来说,node.js作为js的运行平台,我们可以通过编写系统级或者

  • 我在Spring批处理应用程序中使用Spring WebFlux WebClient,当我调用block时,我遇到了错误。代码非常简单,但是当我尝试从批处理作业中的控制器上的 Rest 终结点启动作业的应用程序时,我收到错误。 其余终结点如下所示: 这是调用远程客户端以获取产品目录信息的方法,控制器可以使用这些信息来加载有关产品的信息 findProductInfo方法包装在一个Service中,

  • 我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?

  • 问题内容: 我正在通过TCP / IP套接字读取数据流。流负载非常不均匀。有时每秒会有大量数据到达,有时一个小时没有数据到达。在长时间不活动的情况下(远程服务器没有数据,但连接仍然在线),我的程序应采取一些措施。 我正在使用select()实现超时。它告诉我是否已经准备好数据,但是我不知道在不引起read()阻塞的情况下我可以读取多少数据。阻止是无法接受的,因为它的持续时间可能远远超过我需要的超时