当前位置: 首页 > 知识库问答 >
问题:

subscribeOn在ServerRequest的bodyToMono上使用时不工作

仲霍英
2023-03-14

我知道subscribeOn在序列被subscribe时会切换执行线程,但我发现它不能与ServerRequest.bodyTomono/flux一起工作

类似于

Flux.just(1,2,3)
          .doOnNext(integer -> log.info("test {}",integer))
          .subscribeOn(Schedulers.elastic())
          .subscribe();

将使执行线程更改

INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 1
INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 2
INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 3

但让我困惑的是

@Configuration
public class TestRouter {
    @Bean
    public RouterFunction<ServerResponse> testRouterFunction(TestService testService) {
        return route().path("/test", builder -> builder.nest(accept(MediaType.ALL),
          route -> route.PUT("/", req -> {
              Mono<String> valueMono = req.bodyToMono(String.class);
              return ServerResponse.ok().body(testService.test(valueMono), String.class);
          }))).build();
    }
}
@Service
@Slf4j
public class TestService {
    public Mono<String> test(Mono<String> mono) {
        return mono
          .doOnSubscribe(subscription -> log.info("on subscribe"))
          .subscribeOn(Schedulers.elastic())
          .doOnNext(s -> log.info("received {}", s))
          .subscribeOn(Schedulers.elastic());
    }
}
subscribeOn
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService     : on subscribe
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService     : received test

感谢@MichaelBerry@SimonBaslé,你们两个帮了我很大的忙

简而言之,reactor-netty将覆盖http订阅的subscribeOn,使用FlatMap()在不同的Mono/FluxPublishon()上包含单独的subscribeOn()可以完成我想要的工作

共有1个答案

孟征
2023-03-14

这不是您可以更改的--只有在调用subscribe()之前,链中的最后一个subscribeon()调用才是受尊重的,因此这取决于WebFlux使用它想要的任何调度程序。在本例中,它看起来像是在NIO驱动的事件循环或类似的循环中处理请求。

但是,您可以在链中包含flatmap()调用,您可以为此指定一个单独的subscribeon(),该调用不会被覆盖。根据您的用例,这可能是一个选项,因为您可以在flatmap()调用中定义的发布服务器中完成大部分工作。

 类似资料:
  • 我们在Spring WebFlux中使用反应性Spring数据存储库,我对SubscribeOn的理解是,它决定SubscribeOn之前的操作符将在flux中执行哪个线程池,而PublishOn则决定订阅将在哪个线程池上执行。然而,在下面的代码中,即使使用PublishOn和SubscribeOn,代码也不会在主线程上执行,而是返回到cluster-nio-worker-1。 另外,thread

  • 我有以下代码: 第三方DK。doSomeAction回调在主线程上发布,因此发射器也将在主线程上发出,而不是在订阅线程上发出(如果我在flatMap中进一步进行一些网络交互,链将失败)。 如果我在第一个之后添加,它会切换到正确的线程,但是有没有办法在正确的线程上发出?我不能修改行为。

  • subscribeOn 指定 Observable 在那个 Scheduler 执行 ReactiveX 使用 Scheduler 来让 Observable 支持多线程。你可以使用 subscribeOn 操作符,来指示 Observable 在哪个 Scheduler 执行。 observeOn 操作符非常相似。它指示 Observable 在哪个 Scheduler 发出通知。 默认情况下,

  • 我试图在不同的IO线程中运行块和块。但是输出是这样的: 如您所见,调用似乎对流没有影响,引用调用方法的线程。 另外,请考虑下面的代码: 这些代码的问题是什么?谢了。

  • 我做的样本项目,以实现顶部菜单。 下面是项目结构。 TopMenu是结构如下所示的父视图。 在TopViewController中,下面的viewDidLoad是我拥有的。 现在,当我执行项目时,我看到的是深灰色,但当我单击按钮时,什么都没有发生(iAction没有被调用)。 我所期望的是,当我单击clickme时,我将在日志中看到clickme==深灰色。 为什么会这样? 正如MidhunMP所

  • 我正在使用Spring 5 WebClient进行一个外部api调用,希望将响应映射到这样的Object: 但是我收到一个错误: 但是,如果我像这样将响应正文提取到字符串: 然后它就正常工作了。有什么办法解决这个问题吗? 编辑: 响应正文: 标题: