当前位置: 首页 > 知识库问答 >
问题:

具有并行性的Reactor GroupBy在同一线程上运行

弘阳德
2023-03-14

我试图实现每个组的并行性,其中分组元素并行运行,组内每个元素按顺序工作。然而,对于下面的代码,第一个emit使用并行线程,但对于后续emit,它使用一些不同的线程池。如何实现组的并行性和组内元素的顺序执行。

public class ReactorTest implements SmartLifecycle, ApplicationListener<ApplicationReadyEvent> {

    private AtomicInteger counter = new AtomicInteger(1);
    private Many<Integer> healthSink;
    private Disposable dispose;

    private ScheduledExecutorService executor;

    @Override
    public void start() {
        executor = Executors.newSingleThreadScheduledExecutor();
        healthSink = Sinks.many().unicast().onBackpressureBuffer();
        dispose = healthSink.asFlux().groupBy(v -> v % 3 == 0).parallel(10)
                .runOn(Schedulers.newBoundedElastic(10, 100, "k-task")).log().flatMap(v -> v)
                .subscribe(v -> log.info("Data {}", v));
    }

    @Override
    public void stop() {
        executor.shutdownNow();
        if (dispose != null) {
            dispose.dispose();
        }
    }

    @Override
    public boolean isRunning() {
        return executor == null ? false : !executor.isShutdown();
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {

        executor.scheduleAtFixedRate(() -> {
            healthSink.tryEmitNext(counter.incrementAndGet());
            healthSink.tryEmitNext(counter.incrementAndGet());
            healthSink.tryEmitNext(counter.incrementAndGet());
        }, 10, 10, TimeUnit.SECONDS);
    }
}

日志

2021-07-27 14:15:34.189  INFO 22212 --- [  restartedMain] i.g.kprasad99.reactor.DemoApplication    : Started DemoApplication in 1.464 seconds (JVM running for 1.795)
2021-07-27 14:15:44.206  INFO 22212 --- [       k-task-1] reactor.Parallel.RunOn.1                 : onNext(UnicastGroupedFlux)
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-2] reactor.Parallel.RunOn.1                 : onNext(UnicastGroupedFlux)
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-1] io.github.kprasad99.reactor.ReactorTest  : Data 2
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-2] io.github.kprasad99.reactor.ReactorTest  : Data 3
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-1] io.github.kprasad99.reactor.ReactorTest  : Data 4
2021-07-27 14:15:54.200  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 5
2021-07-27 14:15:54.200  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 6
2021-07-27 14:15:54.200  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 7
2021-07-27 14:16:04.195  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 8
2021-07-27 14:16:04.195  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 9
2021-07-27 14:16:04.195  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 10
2021-07-27 14:16:14.206  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 11
2021-07-27 14:16:14.206  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 12
2021-07-27 14:16:14.206  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 13
2021-07-27 14:16:24.197  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 14
2021-07-27 14:16:24.197  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 15
2021-07-27 14:16:24.197  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 16
2021-07-27 14:16:34.196  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 17
2021-07-27 14:16:34.196  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 18
2021-07-27 14:16:34.196  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 19
2021-07-27 14:16:44.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 20
2021-07-27 14:16:44.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 21
2021-07-27 14:16:44.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 22
2021-07-27 14:16:54.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 23

共有1个答案

东方富
2023-03-14

你需要把<代码>。平行(…) 在<代码>之后。平面图(…) 运算符

Flux.interval(Duration.ofMillis(100))
  .take(10)
  .groupBy(v -> v % 2 == 0)
  .flatMap(f -> f)
  .parallel(2)
  .runOn(Schedulers.newBoundedElastic(2, 10, "k-task"))
  .subscribe(i -> log.info("Data {}", i));

结果:

10:32:33.377 [k-task-1] INFO  Data 0
10:32:33.466 [k-task-2] INFO  Data 1
10:32:33.562 [k-task-1] INFO  Data 2
10:32:33.673 [k-task-2] INFO  Data 3
10:32:33.766 [k-task-1] INFO  Data 4
10:32:33.860 [k-task-2] INFO  Data 5
10:32:33.971 [k-task-1] INFO  Data 6
10:32:34.065 [k-task-2] INFO  Data 7
10:32:34.163 [k-task-1] INFO  Data 8
10:32:34.268 [k-task-2] INFO  Data 9
 类似资料:
  • 我们都知道Java会彻底优化我们的代码,我们都喜欢它。嗯,大多数时候。下面是一段让我头疼的代码: 在快速、慢速的单线程和多线程处理器之间,结果可能会有所不同。在我测试的计算机上(没有doSomething),输出如下: CompareThread的前几次迭代运行良好,然后Java进行了“优化”:testValue和currentValue总是相等的,并且不断地改变它们的值,尽管线程从未离开最内层的

  • 我正在运行RxJava并创建一个主题以使用方法生成数据。我正在使用Spring。 这是我的设置: 在RxJava流上生成新数据的方式是通过Autowire private SubjectObserver SubjectObserver,然后调用SubjectObserver。发布(newDataObjGenerated) 无论我为subscribeOn()指定了什么 Schedulers.io()

  • 我试着运行一个程序,使用线程显示带有数字的乘法、除法、加法和减法表。 但是我希望数字被乘以或相加等。由用户选择。 也就是说,程序应该在用户为每个操作选择一个数字后运行,然后显示结果。

  • 我正在尝试用Mono中的值填充Flux中的对象。当我尝试这样做时,它只是忽略了我的“设置”操作。我假设这是因为Flux正在并行工作,而Mono没有。我该如何解决这个问题? 以下是一些日志 如您所见,我正在尝试将国家代码设置为代理。

  • 以下是我面临的问题的简短代码版本: 这是我得到的输出: 我很惊讶地看到在那里!当我取消注释调用,甚至取消注释单个语句时,我预计会发生这样的事情: 我理解将确保它不会在线程上运行,但是我想避免将供应商返回的数据从运行的线程传递到将要运行的线程然后在链中应用其他后续的代码。

  • 我创建了两个类来计算数字,其中一个类同步执行,另一个类将其分成两半,并在两个线程中执行这两半。(intel i5(4个CPU),8GB ram)代码如下: 公共类: 多线程执行:public class Sheet2{