当前位置: 首页 > 知识库问答 >
问题:

我可以使用从Spring5的WebClient返回的Flux的block()方法吗?

臧威
2023-03-14

我创建了spring boot 2.0演示应用程序,其中包含两个使用WebClient进行通信的应用程序。当我从WebClient的响应中使用Flux的block()方法时,它们经常停止通信。由于某些原因,我想使用列表而不是通量。

服务器端应用程序是这样的。它只返回Flux对象。

@GetMapping
public Flux<Item> findAll() {
    return Flux.fromIterable(items);
}

而客户端(或BFF端)应用程序是这样的。我从服务器获取Flux,并通过调用block()方法将其转换为List。

@GetMapping
public List<Item> findBlock() {
    return webClient.get()
        .retrieve()
        .bodyToFlux(Item.class)
        .collectList()
        .block(Duration.ofSeconds(10L));
}

虽然它一开始工作得很好,但findBlock()不会响应,并且在多次访问之后会超时。当我修改findBlock()方法以返回删除collectList()和block()的Flux时,它工作得很好。然后我假设block()方法导致了这个问题。
并且,当我修改findAll()方法以返回List时,没有任何改变。

这里是整个示例应用程序的源代码。
https://github.com/cero-t/webclient-example

“资源”是服务器端应用程序,“前端”是客户端应用程序。运行完这两个应用程序后,当我访问localhost:8080时,它工作得很好,我可以随时重新加载,但当我访问localhost:8080/block时,它似乎工作得很好,但几次重新加载后,它就没有响应了。

顺便说一下,当我将“spring-boot-starter-web”依赖项添加到“前端”应用程序(而不是资源应用程序)的pom.xml(这意味着我使用tomcat)时,这个问题从未发生过。这个问题是由于Netty服务器造成的吗?

如有任何指导,将不胜感激。

共有1个答案

通典
2023-03-14

首先,让我指出,只有在从内存中提取items时,才建议使用flux.fromiterable(items)(不涉及I/O)。否则,您可能会使用阻塞API来获取它--这可能会破坏您的反应性应用程序。在这种情况下,这是一个内存中的列表,所以没有问题。注意,您还可以使用flux.just(item1,item2,item3)

使用以下方法是最有效的:

@GetMapping("/")
public Flux<Item> findFlux() {
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class);
}

item实例将以非常有效的方式进行读/写、解码/编码。

另一方面,这也不是首选的方式:

@GetMapping("/block")
public List<Item> findBlock() {
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class)
    .collectList()
    .block(Duration.ofSeconds(10L));
}

在这种情况下,前端应用程序使用collectlist在内存中缓冲整个项目列表,但同时也阻塞了为数不多的可用服务器线程之一。这可能会导致非常差的性能,因为您的服务器在等待该数据时可能会被阻塞,并且无法同时为其他请求提供服务。

在这种特殊情况下,情况更糟,因为应用程序完全中断。看中控台,我们可以看到以下几点:

WARN 3075 --- [ctor-http-nio-7] io.netty.util.concurrent.DefaultPromise  : An exception was thrown by reactor.ipc.netty.channel.PooledClientContextHandler$$Lambda$532/356589024.operationComplete()

reactor.core.Exceptions$BubblingException: java.lang.IllegalArgumentException: Channel [id: 0xab15f050, L:/127.0.0.1:59350 - R:localhost/127.0.0.1:8081] was not acquired from this ChannelPool
    at reactor.core.Exceptions.bubble(Exceptions.java:154) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]

这可能与reactor-netty客户端连接池问题有关,该问题应该在0.7.4版本中修复。我不知道这方面的具体情况,但我怀疑整个连接池会被破坏,因为HTTP响应没有正确地从客户端连接中读取。

添加spring-boot-starter-web确实使您的应用程序使用Tomcat,但它主要是将您的spring WebFlux应用程序变成一个spring MVC应用程序(它现在支持一些反应返回类型,但有一个不同的运行时模型)。如果希望使用Tomcat测试应用程序,可以将spring-boot-starter-tomcat添加到POM中,这将使用Tomcat和spring WebFlux。

 类似资料:
  • 我试图获取值(字符串)使用Spring WebFlux WebClient,(使用SpringBoot版本2.4.5,) 但在误差以下 块方法参考-https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-client-synchronous

  • 我正在编写一个方法,旨在递归地搜索嵌套集合中的值并返回包含该值的集合: 这很好,但当我使用该方法时,如果不先将返回的可选项赋给如下变量,我就无法直接处理它: 在这种情况下,编译器无法知道方法的返回类型。我不知道有什么好的可能使它生效。 我想出了另外两种方法,也不是很好: > 但我不喜欢这样,因为额外的参数(实际上)是不必要的,并且使方法不容易理解。 而不是可选,我只返回集合对象。但在这种情况下,我

  • SubjectUtils是具有上述方法的类。然而,即使getAttribute返回一个空列表,我不应该期望它重新运行字符串列表吗?(testData)

  • 如何正确处理由期货构建的Monos? 我试着让我的头脑围绕着Spring Reactive(和Spring 5),观看所有的视频,阅读所有我能找到的博客,但他们似乎都没有做一些事情,而不仅仅是查询数据库或其他琐碎的事情。 我正在使用新的AWS 2.0开发工具包,它使用的用于大多数事情。使用服务创建新实例,我的方法如下所示 我在这里的理解是,我几乎立即返回类型的,而将随时执行它的操作。 我从我的路由

  • 我在Spring批处理应用程序中使用Spring WebFlux WebClient,当我调用block时,我遇到了错误。代码非常简单,但是当我尝试从批处理作业中的控制器上的 Rest 终结点启动作业的应用程序时,我收到错误。 其余终结点如下所示: 这是调用远程客户端以获取产品目录信息的方法,控制器可以使用这些信息来加载有关产品的信息 findProductInfo方法包装在一个Service中,

  • 我实现了一个try-catch块。 我试图用一种特定的方式实现捕捉块,但是它不太好用。如果输入不是整数,它应该重复并返回到try块。它只工作一次,但更多。 你能给我一些帮助吗?非常感谢。