我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到了错误,api将返回成功,但是在生成文件时有一个DTO详细说明了错误,而不是文件本身。这是在使用一个非常旧和设计差的api,所以请原谅使用post和api设计。
api调用(exchange())的响应是ClientResponse。在这里,我可以使用bodyToMono转换为ByteArrayResource,该ByteArrayResource可以流式传输到文件,或者,如果在创建文件时出现错误,那么我也可以使用bodyToMono转换为DTO。但是,我似乎不能这样做,也不能依赖于ClientResponse头的内容。
在运行时,我得到一个IllegalStateException
block()/blockfirst()/blocklast()是阻塞的,线程反应器-http-client-epoll-12不支持这一点
我认为我的问题是我不能在同一个函数链中调用block()两次。
我的代码片段是这样的:
webClient.post()
.uri(uriBuilder -> uriBuilder.path("/file/")
.queryParams(params).build())
.exchange()
.doOnSuccess(cr -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
createErrorFile(dto);
}
else {
ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
createSpreadsheet(bAr);
}
}
)
.block();
这可能吗?
首先,一些有助于您理解解决此用例的代码片段的东西。
subscribe
也不是一个好主意。这或多或少类似于在单独的线程中作为任务启动该作业。完成后,您将得到一个回调(subscribe
方法可以被赋予lambdas),但实际上您正在将当前管道与该任务解耦。在这种情况下,在您有机会读取完整的响应正文并将其写入文件之前,可以关闭客户端HTTP响应并清理资源databuffer
(考虑可以池化的ByteBuffer实例)。void
),则可以调用block,例如在测试用例中。下面是一个代码片段,您可以使用它来完成此操作:
Mono<Void> fileWritten = WebClient.create().post()
.uri(uriBuilder -> uriBuilder.path("/file/").build())
.exchange()
.flatMap(response -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
return createErrorFile(dto);
}
else {
Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
return createSpreadsheet(body);
}
});
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation
正如您所看到的,我们没有阻塞任何地方,处理I/O的方法返回mono
,这是done(error)
回调的反应性等价物,它在事情完成和是否发生错误时发出信号。
因为我不确定createErrorFile
方法应该做什么,所以我提供了createsPreadSheet
的示例,它只是将正文字节写入文件。请注意,由于Databuffer可能被回收/池化,我们需要在完成后释放它们。
private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
try {
Path file = //...
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
} catch (IOException exc) {
return Mono.error(exc);
}
}
通过此实现,您的应用程序将在给定时间在内存中保存几个databuffer
实例(由于性能原因,反应运算符是预取值),并将以反应方式写入字节。
问题内容: 我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,则api返回成功,但是DTO在生成文件而不是文件本身时会详细说明错误。这使用的是非常古老且设计不佳的api,因此请原谅post和api设计的使用。 api调用(exchange())的响应是ClientResponse。从这里,我可以使用bodyToMono转换为ByteArrayResourc
我在Spring Webflow中执行阻塞操作时遇到了一个小问题。我检索文章文档列表,并从文章文档列表中,我想更新另一个对象。 当我执行以下操作时,有时它会工作,有时会抛出“block()/block First()/block Last()被阻塞,这在线程reactor-超文本传输协议-nio-2中是不支持的”。你能建议如何修复吗?我真的不想让它阻塞,但不知道如何继续。stackoverflow
本文向大家介绍node.js回调函数之阻塞调用与非阻塞调用,包括了node.js回调函数之阻塞调用与非阻塞调用的使用技巧和注意事项,需要的朋友参考一下 首先,node.js作为javascript运行平台,它采用了事件驱动和异步编程的方式,通过事件注册和异步函数,开发人员可以提高资源利用率,服务器的性能也能得到改善。其次,对于前端人来说,node.js作为js的运行平台,我们可以通过编写系统级或者
我在Spring批处理应用程序中使用Spring WebFlux WebClient,当我调用block时,我遇到了错误。代码非常简单,但是当我尝试从批处理作业中的控制器上的 Rest 终结点启动作业的应用程序时,我收到错误。 其余终结点如下所示: 这是调用远程客户端以获取产品目录信息的方法,控制器可以使用这些信息来加载有关产品的信息 findProductInfo方法包装在一个Service中,
我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?
问题内容: 我正在通过TCP / IP套接字读取数据流。流负载非常不均匀。有时每秒会有大量数据到达,有时一个小时没有数据到达。在长时间不活动的情况下(远程服务器没有数据,但连接仍然在线),我的程序应采取一些措施。 我正在使用select()实现超时。它告诉我是否已经准备好数据,但是我不知道在不引起read()阻塞的情况下我可以读取多少数据。阻止是无法接受的,因为它的持续时间可能远远超过我需要的超时