当前位置: 首页 > 知识库问答 >
问题:

在Spring Webflow中执行阻塞JDBC调用

华季同
2023-03-14

我正在使用Spring Webflux和Spring数据jpa,使用PostgreSql作为后端数据库。我不想在进行诸如查找和保存之类的db调用时阻塞主线程。为了实现同样的目标,我在Controller类中有一个主调度器,在服务类中有一个jdbcScheduler。

我定义它们的方式是:

@Configuration
@EnableJpaAuditing
public class CommonConfig {

    @Value("${spring.datasource.hikari.maximum-pool-size}")
    int connectionPoolSize;

    @Bean
    public Scheduler scheduler() {
        return Schedulers.parallel();
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
        return new TransactionTemplate(transactionManager);
    }
}

现在,当在我的服务层中进行获取/保存调用时,我这样做:

    @Override
    public Mono<Config> getConfigByKey(String key) {
        return Mono.defer(
            () -> Mono.justOrEmpty(configRepository.findByKey(key)))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
        return Flux
            .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> addConfigs(List<Config> configList) {
        return Flux.fromIterable(configRepository.saveAll(configList))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

在控制器中,我执行以下操作:

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
        return configService.addConfigs(configs).collectList()
            .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
            .subscribeOn(scheduler);
    }

这是正确的吗?和/或有更好的方法吗?

我的理解是:

.subscribeOn(jdbcScheduler)
.publishOn(scheduler);

该任务是否将在jdbcScheduler线程上运行,稍后的结果将在我的主并行调度程序上发布。这种理解正确吗?

共有2个答案

左劲
2023-03-14

恕我直言,有一种方法可以更好地利用机器资源来执行此操作。在留档后,您可以将调用包装在其他线程中,这样您就可以继续执行。

卢文博
2023-03-14

您对发布和订阅的理解是正确的(请参阅Reactor项目中有关这些操作员的参考文档)。

如果调用阻塞库而不调度在特定调度器上工作的线程,那么这些调用将阻塞少数可用线程中的一个线程(默认情况下,Netty事件循环),并且应用程序将只能同时服务少数请求。

现在我不知道你这样做是想达到什么目的。

首先,并行调度程序是为受CPU限制的任务而设计的,这意味着您将拥有很少的任务,与CPU内核一样多(或更多)。在这种情况下,这就像将您的线程池大小设置为常规Servlet容器上的内核数量。您的应用程序将无法处理大量并发请求。

即使您选择了更好的替代方案(如弹性调度器),它仍然不如Netty事件循环好,Netty事件循环是在Spring WebFlux中本机调度请求处理的地方。

如果您的最终目标是性能和可伸缩性,那么在反应式应用程序中包装阻塞调用的性能可能会比常规Servlet容器差。

您可以使用Spring MVC和:

  • 在处理阻塞库时使用常用的阻塞返回类型,如JPA
  • 当您不受这些库的限制时,请使用Mono和Flux返回类型

这不会是非阻塞的,但它仍然是异步的,您将能够并行完成更多的工作,而无需处理复杂性。

 类似资料:
  • 如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间

  • 我目前正在制作一个微服务来为我们的自动化测试环境创建测试用户。该数据库可以通过另一个API访问,因此为了创建测试用户,我需要执行对该API的调用。 应创建测试用户,然后在执行测试后处理。测试用户的标识符是SSN(国家标识符数字),每个公民都是唯一的。我的API/microservice使用生成的SSN生成一个新用户,并应通过API将其发布到数据库,以控制数据库的后端服务。此后端服务不是被动的。 问

  • Vext.x核心手册建议使用< code>executeBlocking()执行阻塞代码,以防止事件循环被阻塞。尽管如此,它还指出: 阻止代码 [] 应阻止合理的时间(即不超过几秒钟)。长阻塞操作...被排除在外。当阻止操作持续超过10秒时,将在控制台上打印一条消息[...]。长阻塞操作应使用由应用程序管理的专用线程,该线程可以使用事件总线或 runOnContext 与顶点进行交互 所以我不能在

  • 本文向大家介绍了解节点中代码执行的阻塞和解除阻塞,包括了了解节点中代码执行的阻塞和解除阻塞的使用技巧和注意事项,需要的朋友参考一下 现在,我们在fs模块中具有文件写入功能writeFileSync,如下所示- 同步表示已同步。这是一个阻塞代码示例。一旦文件写入完成,则仅对其余文件执行代码。上面的代码比较简单,但是如果我们进行大量的文件处理操作,将会导致应用性能下降。 这种代码执行方式将减慢其他请求

  • 我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?

  • 本文向大家介绍node.js回调函数之阻塞调用与非阻塞调用,包括了node.js回调函数之阻塞调用与非阻塞调用的使用技巧和注意事项,需要的朋友参考一下 首先,node.js作为javascript运行平台,它采用了事件驱动和异步编程的方式,通过事件注册和异步函数,开发人员可以提高资源利用率,服务器的性能也能得到改善。其次,对于前端人来说,node.js作为js的运行平台,我们可以通过编写系统级或者