当前位置: 首页 > 知识库问答 >
问题:

如何在Spring Webflux/Reactor Netty web应用程序中执行阻塞调用

鲁浩言
2023-03-14

在我的用例中,我有一个带有Reactor Netty的Spring Webflux微服务,我有以下依赖项:

    null

由于reactor-core(版本0.7.6版)已包含在最新的spring-boot-dependencies(版本2.0.1版)中,因此不能再使用:block()/blockfirst()/blocklast()are blocking(线程xxx不支持),请参阅->https://github.com/reactor/reactor-netty/issues/312

我的代码段:

java prettyprint-override">public Mono<FooBar> getFooBar(Foo foo) {
    MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
    parameters.add("size", foo.getSize());
    parameters.addAll("bars", barReactiveCrudRepository.findAllByIdentifierIn(foo.getBarIdentifiers()) // This obviously returns a Flux
        .map(Bar::toString)
        .collectList()
        .block());

    String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
        .port(8081)
        .path("/foo-bar")
        .queryParams(parameters)
        .build()
        .toString();

    return webClient.get()
        .uri(url)
        .retrieve()
        .bodyToMono(FooBar.class);
}

这适用于spring-boot版本2.0.0.release,但由于升级到版本2.0.1.release,因此从ractor-core升级到版本0.7.6.release,因此不允许再这样做了。

我看到的唯一真正的解决方案是包括一个块(非反应的)存储库/mongo客户机,但我不确定是否鼓励这样做。有什么建议吗?

共有1个答案

湛光华
2023-03-14

webclient不接受其请求URL的publisher类型,但没有什么可以阻止您执行以下操作:

public Mono<FooBar> getFooBar(Foo foo) {

    Mono<List<String>> bars = barReactiveCrudRepository
        .findAllByIdentifierIn(foo.getBarIdentifiers())
        .map(Bar::toString)
        .collectList();

    Mono<FooBar> foobar = bars.flatMap(b -> {

        MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
        parameters.add("size", foo.getSize());
        parameters.addAll("bars", b);

        String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
            .port(8081)
            .path("/foo-bar")
            .queryParams(parameters)
            .build()
            .toString();

        return webClient.get()
            .uri(url)
            .retrieve()
            .bodyToMono(FooBar.class);
    });
    return foobar;         
}

如果有的话,这个新的Reactor核心检查使您避免了在WebFlux处理程序中间使用这个阻塞调用而使整个应用程序崩溃。

 类似资料:
  • 如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间

  • 我正在使用Spring Webflux和Spring数据jpa,使用PostgreSql作为后端数据库。我不想在进行诸如查找和保存之类的db调用时阻塞主线程。为了实现同样的目标,我在Controller类中有一个主调度器,在服务类中有一个jdbcScheduler。 我定义它们的方式是: 现在,当在我的服务层中进行获取/保存调用时,我这样做: 在控制器中,我执行以下操作: 这是正确的吗?和/或有更

  • 当我使用ExecutorService进行异步调用时,它会返回Future对象。根据它返回的布尔值,我必须记录异步调用的状态。 但当我试图从future对象调用get方法时,它会阻止主线程的执行。 是否可以解除屏蔽主线程执行?

  • 我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?

  • 我需要在后台调用一个调用webservice的API。我不想将(非常复杂的)方法转换为异步方法,只想说“在后台完成所有这些”。 但是我迷失在如何用F#做到这一点。这就是我所拥有的:

  • Vext.x核心手册建议使用< code>executeBlocking()执行阻塞代码,以防止事件循环被阻塞。尽管如此,它还指出: 阻止代码 [] 应阻止合理的时间(即不超过几秒钟)。长阻塞操作...被排除在外。当阻止操作持续超过10秒时,将在控制台上打印一条消息[...]。长阻塞操作应使用由应用程序管理的专用线程,该线程可以使用事件总线或 runOnContext 与顶点进行交互 所以我不能在