如何使用Spring Webflux Netty reactor从阻塞调度器(阻塞池)切换回以前的调度器(reactor http nio)?
代码:
@RequiredArgsConstructor
@Service
@Slf4j
public class BookService {
private final IBookRepo bookRepo;
private final BlockingPoolConfig blockingPoolConfig;
public Mono<Optional<Book>> getBook(Long id) {
log.debug("getBook() - id: {}", id);
return asyncCallable(() -> {
log.trace("getBook() - invoking bookRepo.findById(id) ...");
return bookRepo.findById(id);
});
}
protected <S> Mono<S> asyncCallable(Callable<S> callable) {
return Mono.fromCallable(callable)
.subscribeOn(blockingPoolConfig.blockingScheduler());
}
}
@RestController
@RequiredArgsConstructor
@Slf4j
public class BookController {
private final BookService bookService;
@GetMapping("/book/{id}")
public Mono<Book> get(@PathVariable Long id) {
log.debug("get() - id: {}", id);
return bookService.getBook(id)
.publishOn(Schedulers.parallel()) //publishOn(... ?)
.map(optionalBook -> {
return optionalBook.map(book -> {
log.debug("get() result: {}", book);
return book;
}).orElseThrow(() -> {
log.debug("book with id: {} is not found.", id);
return new ResponseStatusException(HttpStatus.NOT_FOUND, "Book not found");
});
});
}
@Configuration
@Slf4j
public class BlockingPoolConfig {
@Value("${spring.datasource.maximumPoolSize:8}")
private int connectionPoolSize = 1;
@Scope("singleton")
@Bean
public Scheduler blockingScheduler() {
Scheduler scheduler = Schedulers.newBoundedElastic(connectionPoolSize, connectionPoolSize, "blocking-pool");
return scheduler;
}
}
上面我使用的是publishOn(Schedulers.parallel()),但这一个创建了新的线程池(parallel)。相反,我更喜欢切换反应器http nio线程池。
实际结果日志:
19:17:45.290 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() - id: 1
19:17:45.291 [reactor-http-nio-2 ] DEBUG t.a.p.service.BookService - getBook() - id: 1
19:17:45.316 [blocking-pool-1 ] TRACE t.a.p.service.BookService - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [parallel-2 ] DEBUG t.a.p.controller.BookController - get() result: Book(id=1, title=Abc)
预期结果日志:
19:17:45.290 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() - id: 1
19:17:45.291 [reactor-http-nio-2 ] DEBUG t.a.p.service.BookService - getBook() - id: 1
19:17:45.316 [blocking-pool-1 ] TRACE t.a.p.service.BookService - getBook() - invoking bookRepo.findById(id) ...
19:17:45.427 [reactor-http-nio-2 ] DEBUG t.a.p.controller.BookController - get() result: Book(id=1, title=Abc)
这目前是不可能的,因为A)这些HTTP线程不是由反应器调度程序控制的,而是由底层的Netty事件循环本身控制的;B)如果某个线程没有关联的执行器服务,Java中没有通用的方法“将执行返回到(任意)线程”。
对于reactor-netty,一旦您退出了HTTP线程,就应该没有什么理由想要切换回netty线程。一旦发送响应,反应器netty将自然完成。
假设阻塞池类似于Schedulers.boundedElastic()
,您可能确实希望转到Schedulers.parallel()
来限制阻塞线程的生命周期,这是一个非常好的解决方案。
本文向大家介绍node.js回调函数之阻塞调用与非阻塞调用,包括了node.js回调函数之阻塞调用与非阻塞调用的使用技巧和注意事项,需要的朋友参考一下 首先,node.js作为javascript运行平台,它采用了事件驱动和异步编程的方式,通过事件注册和异步函数,开发人员可以提高资源利用率,服务器的性能也能得到改善。其次,对于前端人来说,node.js作为js的运行平台,我们可以通过编写系统级或者
进程是操作系统虚拟出来的概念,用来组织计算机中的任务。但随着进程被赋予越来越多的任务,进程好像有了真实的生命,它从诞生就随着CPU时间执行,直到最终消失。不过,进程的生命都得到了操作系统内核的关照。就好像疲于照顾几个孩子的母亲内核必须做出决定,如何在进程间分配有限的计算资源,最终让用户获得最佳的使用体验。内核中安排进程执行的模块称为调度器(scheduler)。这里将介绍调度器的工作方式。 进程状
我不能让非常基本的底拖示例按预期工作。下面的代码应该允许服务并发HTTP请求。实际发生的情况是,只有一个请求被处理,在第一个请求完成之前,其他请求都无法通过。 增加XNIO线程数及其工作线程不会改变任何事情。一个随机的XNIO被分配给请求。然后在调度后分配一个随机工作者。服务器一直处于阻塞状态,直到请求得到服务。
问题内容: 根据我在这里所读的内容,golang调度程序将自动确定goroutine是否在I / O上阻塞,并将自动切换为在未阻塞的线程上处理其他goroutine。 我想知道的是调度程序如何确定该goroutine已停止阻止I / O。 它是否只是经常进行某种轮询以检查其是否仍处于阻塞状态?是否运行某种后台线程来检查所有goroutine的状态? 例如,如果要在需要5s的goroutine中执行
Storm 现在有 4 种内置的 schedulers(调度器): DefaultScheduler, IsolationScheduler, MultitenantScheduler, ResourceAwareScheduler. Pluggable scheduler(可插拔的调度器) 你可以实现你自己的 scheduler(调度器)来替换掉默认的 scheduler(调度器),自定义分配e
调度器提供了同步递增策略变化的方法。 它应以手工艺等一致性算法为基础,以确保所有执行者的一致性和一致性。 通过调度器用户们可以轻松地建立分布式集群。 调度器的方法分为两部分。 第一种是与Casbin相结合的方法。 这些方法应该在Casbin内部调用。 用户们可以使用由Casbin本身提供的更完整的api。 另一个部分是调度器本身定义的方法,包括调度器初始化方法, 和不同算法提供的不同函数,如动态资