当前位置: 首页 > 知识库问答 >
问题:

Spring Webflux项目Reactor中的调度器

孟祯
2023-03-14

Project Reactor很棒,我可以很容易地切换一个线程来处理另一个线程上的某些部分,但我已经查看了schedulers.fromExecutorService()方法,该方法每次都会分配新的ExecutorService。因此,当调用此方法时,总是会创建调度器并再次分配调度器。我不确定,但我认为这可能会导致内存泄漏...


Mono<String> sometext() {
return Mono
.fromCallable(() -> "" )
.subscribeOn(Schedulers.newParallel("my-custom));
}

我想知道如何将调度器注册为bean,它单例所以只会分配一次,而不是每次都在构造函数中创建他。很多博客都是这样解释线程模型的。

...
private final Scheduler scheduler = Schedulers.newParallel("my-custom);
..
Mono.fromCallable(() -> "" ).subscribeOn(scheduler)

共有1个答案

凌嘉志
2023-03-14

schedulers.newparallel()确实会在每次调用它时创建一个新的调度器,它带有一个相关的backed threadpool-所以是的,您是正确的,如果您使用该方法,那么您希望确保在某个地方存储对它的引用,以便可以重用它。简单地提供相同的name参数不仅会检索新的调度器,还会创建一个具有相同名称的不同调度器。

如何做到这一点取决于您--可以通过spring bean(只要它是单例而不是原型bean!)、字段或其他最适合您的用例的方法。

但是,在所有这些之前,我首先考虑您是否确实需要创建一个单独的并行调度器。schedulers.parallex()调度器是一个默认的并行调度器,可用于并行计算tin(它不会在每次调用时创建一个新的调度器),除非您出于某种原因需要为不同的服务单独配置并行调度器,否则最好的做法就是使用它。

 类似资料:
  • 我正在使用spring webflux通过Schudters.elastic()从另一个服务调用另一个服务。 在主线程中,我设置了一个InhertitableThreadlocal变量,在子线程中,我试图访问它,它工作正常。 这是我的线程本地存储类 现在的问题是它第一次工作正常,这意味着我在threadlocal中设置的值被传递给了其他服务。 但第二次,它也使用了旧id(在上次请求中生成)。 我尝

  • 我在学习Reactor,我想知道如何实现某种行为。假设我有一个传入消息流。每条消息都与某个实体关联,并包含一些数据。 与不同实体相关的消息可以并行处理。但是,与任何单个实体相关的消息必须一次处理一条,即在实体“abc”的消息1处理完成之前,无法开始处理实体“abc”的消息2。在处理消息的过程中,应该缓冲该实体的进一步消息。其他实体的消息可以不受阻碍地进行。可以将其视为每个实体的线程上都有这样的代码

  • 请忽略这个问题。我有错误的设置,导致gRPC的性能不佳。 是否可以比较GRPC与项目Reactor? 我只是想比较REST和GRPC的性能。我看不出GRPC比Reactor快。事实上,它更糟。 GRPC设置: 此 grpc 服务器使用服务器端流对来自 api 服务器的每个请求响应 1000 个“Hello”。 api 服务器返回

  • 就而言,它们之间的主要区别是什么?从文档中,我了解到是异步操作,而是同步操作。但这对我来说并没有什么意义,B/C单声道是关于并行性的,这一点是不可理解的。有人能用一种更容易理解的方式来重新表述吗? 然后在的文档中声明(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/mono.html#FlatMap-ja

  • 反应器中的项目包含循环引用:Vertex{tag='org.spigotmc: spiget-api: 1.8.8-R0.1-SNAPSHOT'}'和'Vertex{tag='org.spigotmc: spigot: 1.8.8-R0.1-SNAPSHOT'}'之间的边缘在图中引入循环org.spigotmc: spigot: 1.8.8-R0.1-SNAPSHOT- “Vertex { lab

  • 项目Reactor3.1.5。发布 考虑这一点: 我希望订阅服务器在多个线程中运行,但它只在一个线程中运行: 留档告诉我的期望是正确的(http://projectreactor.io/docs/core/release/reference/#threading)。有人能给我解释一下那里发生了什么吗?