当前位置: 首页 > 知识库问答 >
问题:

SubscribeOn或PublishOn不使用ReactiveCassandraCrudRepository

丁曦哲
2023-03-14

我们在Spring WebFlux中使用反应性Spring数据存储库,我对SubscribeOn的理解是,它决定SubscribeOn之前的操作符将在flux中执行哪个线程池,而PublishOn则决定订阅将在哪个线程池上执行。然而,在下面的代码中,即使使用PublishOn和SubscribeOn,代码也不会在主线程上执行,而是返回到cluster-nio-worker-1。

    System.out.println("Current Thread :- "+Thread.currentThread().getName()); //Current Thread :- main
    personRepository.findAll().log()
            .map(document -> mapDocumentToSomethingElse(document)) //Current thread cluster-nio-worker-1
            .subscribeOn(Schedulers.immediate())
            .publishOn(Schedulers.immediate())
            .subscribe(trackingevent -> System.out.println("Got Item "+item +" inside thread "+Thread.currentThread()), //Thread[cluster-nio-worker-1,5,main]
                    excp -> excp.printStackTrace(),
                    () -> System.out.println("Completed processing Thread:- "+Thread.currentThread().getName())); //cluster-nio-worker-1

另外,thread[cluster-nio-worker-1,5,main]是什么意思?为什么这些方法调用不使用主线程执行。

共有1个答案

颛孙沈义
2023-03-14

subscribeOn方法使发布服务器使用给定的线程池来发布值。管道中可能有N个subscribeon方法。最衣着的就会生效。PersonRepository.findAll().log()是一个包装器,返回一个流量。因此,如果它在内部使用任何调度程序,那么您就不能使用subscribeon更改它。例如,interval方法使用parallel并且我不能将其更改为boundedElastic(如图所示)。

    Flux.interval(Duration.ofSeconds(1))
            .subscribeOn(Schedulers.boundedElastic())

schedulers.immediate只是将管道执行保持在同一线程中。它不是主的,在您的示例中,它将是cluster-nio-worker线程池。

我们可以从主线程切换到任何调度器线程池。但是我们不能将执行切换回主线程。它不是工程Reactor的限制。它应该是Java本身的一个限制。

 类似资料:
  • subscribeOn 指定 Observable 在那个 Scheduler 执行 ReactiveX 使用 Scheduler 来让 Observable 支持多线程。你可以使用 subscribeOn 操作符,来指示 Observable 在哪个 Scheduler 执行。 observeOn 操作符非常相似。它指示 Observable 在哪个 Scheduler 发出通知。 默认情况下,

  • 我知道subscribeOn在序列被subscribe时会切换执行线程,但我发现它不能与ServerRequest.bodyTomono/flux一起工作 类似于 将使执行线程更改 但让我困惑的是 感谢@MichaelBerry@SimonBaslé,你们两个帮了我很大的忙 简而言之,reactor-netty将覆盖http订阅的subscribeOn,使用在不同的或上包含单独的可以完成我想要的工

  • 我理解在反应流中使用阻塞操作时,我们应该使用

  • 我们学到了如何在一个调度器上运行一个任务。但是我们如何利用它来和Observables一起工作呢?RxJava提供了subscribeOn()方法来用于每个Observable对象。subscribeOn()方法用Scheduler来作为参数并在这个Scheduler上执行Observable调用。 在“真实世界”这个例子中,我们调整loadList()函数。首先,我们需要一个新的getApps(

  • 我试图在不同的IO线程中运行块和块。但是输出是这样的: 如您所见,调用似乎对流没有影响,引用调用方法的线程。 另外,请考虑下面的代码: 这些代码的问题是什么?谢了。

  • 我在使用RxJava concat运算符时遇到了一个问题。我有两个可观察项,第一个从服务器数据库发出结果,另一个从本地数据库发出结果,然后我将: 因此,这给我带来了问题,因为我没有使用连接的observable是在上运行的,这不运行远程,它启动了。 如果我实现了,我将从不正确的线程获得上访问,因为Observable当然不在存在领域实例的线程上运行。 我已经搜索了其他问题,但没有得到任何有用的信息