当前位置: 首页 > 知识库问答 >
问题:

如何使用netty reactor从阻塞调度器切换回以前的调度器?

江阳夏
2023-03-14

如何使用Spring Webflux Netty reactor从阻塞调度器(阻塞池)切换回以前的调度器(reactor http nio)?

代码:

@RequiredArgsConstructor
@Service
@Slf4j
public class BookService {

    private final IBookRepo bookRepo;

    private final BlockingPoolConfig blockingPoolConfig;

    public Mono<Optional<Book>> getBook(Long id) {
        log.debug("getBook() - id: {}", id);
        return asyncCallable(() -> {
            log.trace("getBook() - invoking bookRepo.findById(id) ...");
            return bookRepo.findById(id);
        });
    }

    protected <S> Mono<S> asyncCallable(Callable<S> callable) {
        return Mono.fromCallable(callable)
                .subscribeOn(blockingPoolConfig.blockingScheduler()); 
    }
}

@RestController
@RequiredArgsConstructor
@Slf4j
public class BookController {

    private final BookService bookService;

    @GetMapping("/book/{id}")
    public Mono<Book> get(@PathVariable Long id) {
        log.debug("get() - id: {}", id);
        return bookService.getBook(id)
                .publishOn(Schedulers.parallel())  //publishOn(... ?)
                .map(optionalBook -> {
                    return optionalBook.map(book -> {
                        log.debug("get() result: {}", book);
                        return book;
                    }).orElseThrow(() -> {
                        log.debug("book with id: {} is not found.", id);
                        return new ResponseStatusException(HttpStatus.NOT_FOUND, "Book not found");
                    });
                });
    }

@Configuration
@Slf4j
public class BlockingPoolConfig {

    @Value("${spring.datasource.maximumPoolSize:8}")
    private int connectionPoolSize = 1;

    @Scope("singleton")
    @Bean
    public Scheduler blockingScheduler() {
        Scheduler scheduler = Schedulers.newBoundedElastic(connectionPoolSize, connectionPoolSize, "blocking-pool");
        return scheduler;
    }
}

上面我使用的是publishOn(Schedulers.parallel()),但这一个创建了新的线程池(parallel)。相反,我更喜欢切换反应器http nio线程池。

实际结果日志:

19:17:45.290 [reactor-http-nio-2       ] DEBUG t.a.p.controller.BookController    - get() - id: 1
19:17:45.291 [reactor-http-nio-2       ] DEBUG t.a.p.service.BookService          - getBook() - id: 1
19:17:45.316 [blocking-pool-1          ] TRACE t.a.p.service.BookService          - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [parallel-2               ] DEBUG t.a.p.controller.BookController    - get() result: Book(id=1, title=Abc)

预期结果日志:

19:17:45.290 [reactor-http-nio-2       ] DEBUG t.a.p.controller.BookController    - get() - id: 1
19:17:45.291 [reactor-http-nio-2       ] DEBUG t.a.p.service.BookService          - getBook() - id: 1
19:17:45.316 [blocking-pool-1          ] TRACE t.a.p.service.BookService          - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [reactor-http-nio-2       ] DEBUG t.a.p.controller.BookController    - get() result: Book(id=1, title=Abc)

共有1个答案

冷涵忍
2023-03-14

这目前是不可能的,因为A)这些HTTP线程不是由反应器调度程序控制的,而是由底层的Netty事件循环本身控制的;B)如果某个线程没有关联的执行器服务,Java中没有通用的方法“将执行返回到(任意)线程”。

对于reactor-netty,一旦您退出了HTTP线程,就应该没有什么理由想要切换回netty线程。一旦发送响应,反应器netty将自然完成。

假设阻塞池类似于Schedulers.boundedElastic(),您可能确实希望转到Schedulers.parallel()来限制阻塞线程的生命周期,这是一个非常好的解决方案。

 类似资料:
  • 本文向大家介绍node.js回调函数之阻塞调用与非阻塞调用,包括了node.js回调函数之阻塞调用与非阻塞调用的使用技巧和注意事项,需要的朋友参考一下 首先,node.js作为javascript运行平台,它采用了事件驱动和异步编程的方式,通过事件注册和异步函数,开发人员可以提高资源利用率,服务器的性能也能得到改善。其次,对于前端人来说,node.js作为js的运行平台,我们可以通过编写系统级或者

  • 进程是操作系统虚拟出来的概念,用来组织计算机中的任务。但随着进程被赋予越来越多的任务,进程好像有了真实的生命,它从诞生就随着CPU时间执行,直到最终消失。不过,进程的生命都得到了操作系统内核的关照。就好像疲于照顾几个孩子的母亲内核必须做出决定,如何在进程间分配有限的计算资源,最终让用户获得最佳的使用体验。内核中安排进程执行的模块称为调度器(scheduler)。这里将介绍调度器的工作方式。 进程状

  • 我不能让非常基本的底拖示例按预期工作。下面的代码应该允许服务并发HTTP请求。实际发生的情况是,只有一个请求被处理,在第一个请求完成之前,其他请求都无法通过。 增加XNIO线程数及其工作线程不会改变任何事情。一个随机的XNIO被分配给请求。然后在调度后分配一个随机工作者。服务器一直处于阻塞状态,直到请求得到服务。

  • Storm 现在有 4 种内置的 schedulers(调度器): DefaultScheduler, IsolationScheduler, MultitenantScheduler, ResourceAwareScheduler. Pluggable scheduler(可插拔的调度器) 你可以实现你自己的 scheduler(调度器)来替换掉默认的 scheduler(调度器),自定义分配e

  • 调度器提供了同步递增策略变化的方法。 它应以手工艺等一致性算法为基础,以确保所有执行者的一致性和一致性。 通过调度器用户们可以轻松地建立分布式集群。 调度器的方法分为两部分。 第一种是与Casbin相结合的方法。 这些方法应该在Casbin内部调用。 用户们可以使用由Casbin本身提供的更完整的api。 另一个部分是调度器本身定义的方法,包括调度器初始化方法, 和不同算法提供的不同函数,如动态资

  • 注:本节未经校验,如有问题欢迎提issue 有时需要设定将来发生的事情,这时该怎么办? ActorSystem 就能搞定一切! 在那儿你能找到 scheduler 方法,它返回一个 akka.actor.Scheduler 实例, 这个实例在每个Actor系统里是唯一的,用来在内部指定一段时间后发生的行为。 请注意定时任务是使用 ActorSystem 的 MessageDispatcher 执行