我知道subscribeOn在序列被subscribe时会切换执行线程,但我发现它不能与ServerRequest.bodyTomono/flux一起工作
类似于
Flux.just(1,2,3)
.doOnNext(integer -> log.info("test {}",integer))
.subscribeOn(Schedulers.elastic())
.subscribe();
将使执行线程更改
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 1
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 2
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 3
但让我困惑的是
@Configuration
public class TestRouter {
@Bean
public RouterFunction<ServerResponse> testRouterFunction(TestService testService) {
return route().path("/test", builder -> builder.nest(accept(MediaType.ALL),
route -> route.PUT("/", req -> {
Mono<String> valueMono = req.bodyToMono(String.class);
return ServerResponse.ok().body(testService.test(valueMono), String.class);
}))).build();
}
}
@Service
@Slf4j
public class TestService {
public Mono<String> test(Mono<String> mono) {
return mono
.doOnSubscribe(subscription -> log.info("on subscribe"))
.subscribeOn(Schedulers.elastic())
.doOnNext(s -> log.info("received {}", s))
.subscribeOn(Schedulers.elastic());
}
}
subscribeOn
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : on subscribe
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : received test
感谢@MichaelBerry@SimonBaslé,你们两个帮了我很大的忙
简而言之,reactor-netty将覆盖http订阅的subscribeOn,使用FlatMap()
在不同的Mono/Flux
或Publishon()
上包含单独的subscribeOn()
可以完成我想要的工作
这不是您可以更改的--只有在调用subscribe()
之前,链中的最后一个subscribeon()
调用才是受尊重的,因此这取决于WebFlux使用它想要的任何调度程序。在本例中,它看起来像是在NIO驱动的事件循环或类似的循环中处理请求。
但是,您可以在链中包含flatmap()
调用,您可以为此指定一个单独的subscribeon()
,该调用不会被覆盖。根据您的用例,这可能是一个选项,因为您可以在flatmap()
调用中定义的发布服务器中完成大部分工作。
我们在Spring WebFlux中使用反应性Spring数据存储库,我对SubscribeOn的理解是,它决定SubscribeOn之前的操作符将在flux中执行哪个线程池,而PublishOn则决定订阅将在哪个线程池上执行。然而,在下面的代码中,即使使用PublishOn和SubscribeOn,代码也不会在主线程上执行,而是返回到cluster-nio-worker-1。 另外,thread
我有以下代码: 第三方DK。doSomeAction回调在主线程上发布,因此发射器也将在主线程上发出,而不是在订阅线程上发出(如果我在flatMap中进一步进行一些网络交互,链将失败)。 如果我在第一个之后添加,它会切换到正确的线程,但是有没有办法在正确的线程上发出?我不能修改行为。
subscribeOn 指定 Observable 在那个 Scheduler 执行 ReactiveX 使用 Scheduler 来让 Observable 支持多线程。你可以使用 subscribeOn 操作符,来指示 Observable 在哪个 Scheduler 执行。 observeOn 操作符非常相似。它指示 Observable 在哪个 Scheduler 发出通知。 默认情况下,
我试图在不同的IO线程中运行块和块。但是输出是这样的: 如您所见,调用似乎对流没有影响,引用调用方法的线程。 另外,请考虑下面的代码: 这些代码的问题是什么?谢了。
我做的样本项目,以实现顶部菜单。 下面是项目结构。 TopMenu是结构如下所示的父视图。 在TopViewController中,下面的viewDidLoad是我拥有的。 现在,当我执行项目时,我看到的是深灰色,但当我单击按钮时,什么都没有发生(iAction没有被调用)。 我所期望的是,当我单击clickme时,我将在日志中看到clickme==深灰色。 为什么会这样? 正如MidhunMP所
我正在使用Spring 5 WebClient进行一个外部api调用,希望将响应映射到这样的Object: 但是我收到一个错误: 但是,如果我像这样将响应正文提取到字符串: 然后它就正常工作了。有什么办法解决这个问题吗? 编辑: 响应正文: 标题: