我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,则api返回成功,但是DTO在生成文件而不是文件本身时会详细说明错误。这使用的是非常古老且设计不佳的api,因此请原谅post和api设计的使用。
api调用(exchange())的响应是ClientResponse。从这里,我可以使用bodyToMono转换为ByteArrayResource并将其传输到文件中,或者,如果在创建文件时出错,那么我也可以使用bodyToMono转换为DTO。但是,我似乎既不执行任何操作,也不依赖于ClientResponse标头的内容。
在运行时,我收到了一个IllegalStateException引起的
block()/ blockFirst()/ blockLast()正在阻塞,线程反应器-http-client-epoll-12不支持
我认为我的问题是我无法在同一功能链中两次调用block()。
我的代码段如下所示:
webClient.post()
.uri(uriBuilder -> uriBuilder.path("/file/")
.queryParams(params).build())
.exchange()
.doOnSuccess(cr -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
createErrorFile(dto);
}
else {
ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
createSpreadsheet(bAr);
}
}
)
.block();
基本上,我想根据标头中定义的MediaType来不同地处理ClientResponse。
这可能吗?
首先,一些事情将帮助您理解解决此用例的代码片段。
subscribe
如评论中所建议的那样,调用也不是一个好主意。它或多或少像在一个单独的线程中将该任务作为任务开始。完成后,您将获得一个回调(subscribe
可以给这些方法指定lambdas),但实际上您正在将当前管道与该任务分离。在这种情况下,在您有机会读取完整的响应正文并将其写入文件之前,可能会关闭客户端HTTP响应并清理资源。DataBuffer
(请考虑可以缓冲的ByteBuffer实例)。void
例如返回),例如在测试用例中,则可以调用阻塞。这是一个可用于执行此操作的代码段:
Mono<Void> fileWritten = WebClient.create().post()
.uri(uriBuilder -> uriBuilder.path("/file/").build())
.exchange()
.flatMap(response -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
return createErrorFile(dto);
}
else {
Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
return createSpreadsheet(body);
}
});
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation
如您所见,我们不会在任何地方阻塞,并且处理I /
O的方法将返回Mono<Void>
,这与done(error)
回调的反应性等效,该回调表示何时完成操作以及是否发生错误。
由于不确定该createErrorFile
方法应该做什么,因此提供了一个示例,createSpreadsheet
该示例仅将主体字节写入文件中。请注意,由于数据缓冲区可能被回收/池化,因此我们需要在完成后释放它们。
private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
try {
Path file = //...
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
} catch (IOException exc) {
return Mono.error(exc);
}
}
通过此实现,您的应用程序将DataBuffer
在给定的时间在内存中保存几个实例(反应式运算符出于性能原因正在预取值),并将以反应式方式写入字节。
我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到了错误,api将返回成功,但是在生成文件时有一个DTO详细说明了错误,而不是文件本身。这是在使用一个非常旧和设计差的api,所以请原谅使用post和api设计。 api调用(exchange())的响应是ClientResponse。在这里,我可以使用bodyToMono转换为ByteArrayResource,
我在Spring Webflow中执行阻塞操作时遇到了一个小问题。我检索文章文档列表,并从文章文档列表中,我想更新另一个对象。 当我执行以下操作时,有时它会工作,有时会抛出“block()/block First()/block Last()被阻塞,这在线程reactor-超文本传输协议-nio-2中是不支持的”。你能建议如何修复吗?我真的不想让它阻塞,但不知道如何继续。stackoverflow
我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?
本文向大家介绍node.js回调函数之阻塞调用与非阻塞调用,包括了node.js回调函数之阻塞调用与非阻塞调用的使用技巧和注意事项,需要的朋友参考一下 首先,node.js作为javascript运行平台,它采用了事件驱动和异步编程的方式,通过事件注册和异步函数,开发人员可以提高资源利用率,服务器的性能也能得到改善。其次,对于前端人来说,node.js作为js的运行平台,我们可以通过编写系统级或者
我在Spring批处理应用程序中使用Spring WebFlux WebClient,当我调用block时,我遇到了错误。代码非常简单,但是当我尝试从批处理作业中的控制器上的 Rest 终结点启动作业的应用程序时,我收到错误。 其余终结点如下所示: 这是调用远程客户端以获取产品目录信息的方法,控制器可以使用这些信息来加载有关产品的信息 findProductInfo方法包装在一个Service中,
在上一节中,我们看到了 take Effect 让我们可以在一个集中的地方更好地去描述一个非常规的流程。 重温一下登录流程示例: function* loginFlow() { while(true) { yield take('LOGIN') // ... perform the login logic yield take('LOGOUT') // ...