当前位置: 首页 > 知识库问答 >
问题:

Flux的方法publishOn没有按预期工作

季骏祥
2023-03-14

我正试图将一个阻塞消费者集成为Reactor铝-SR1中的助焊剂订户。我想使用一个并行调度器,并发地执行阻塞操作。

我实现了一个主类来描述我意图:

package etienne.peiniau;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
                .elapsed()
                .publishOn(Schedulers.parallel())
                .subscribe(new Subscriber<Tuple2<Long, Integer>>() {
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
                        subscription.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Tuple2<Long, Integer> t2) {
                        System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
                        try {
                            Thread.sleep(1000); // long operation
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("[" + Thread.currentThread().getName() + "] Complete");
                    }
                });
        // Waiting for the program to complete
        System.out.println("[" + Thread.currentThread().getName() + "] Main");
        Thread.sleep(100000);
    }

}
[main] Subscription
[main] Main
[parallel-1] [3,1]
[parallel-1] [1000,2]
[parallel-1] [1001,3]
[parallel-1] [1000,4]
[parallel-1] [1000,5]
[parallel-1] [1000,6]
[parallel-1] [1001,7]
[parallel-1] [1000,8]
[parallel-1] [1000,9]
[parallel-1] [1000,10]
[parallel-1] [1000,11]
[parallel-1] [1001,12]
[parallel-1] [1000,13]
[parallel-1] [1000,14]
[parallel-1] [1000,15]
[parallel-1] [1000,16]
[parallel-1] [1001,17]
[parallel-1] [1000,18]
[parallel-1] [1000,19]
[parallel-1] [1000,20]
[parallel-1] Complete

共有1个答案

郑嘉年
2023-03-14

实际上,它如预期的那样工作。您可以看到,在并行运行时间内处理的所有值几乎相同,但您总是在相同的线程中接收元素,并且每次等待1秒时都是这样。

我想在简单的fluxparallel并不意味着更多的线程,它意味着并行工作。例如,如果您运行如下代码:

Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList()))
            .map(i -> {
                System.out.println("map [" + Thread.currentThread().getName() + "] " + i);
                return i;
            })
            .elapsed()
            .publishOn(Schedulers.single())
            .subscribeOn(Schedulers.single())
            .subscribe(t2 -> {
                System.out.println("subscribe [" + Thread.currentThread().getName() + "] " + t2);
            });

您将看到结果:

map [single-1] 0
map [single-1] 1
...
subscribe [single-1] [4,0]
subscribe [single-1] [0,1]
...
map [single-1] 3
subscribe [parallel-1] [5,0]
map [single-1] 4
subscribe [parallel-1] [0,1]
map [single-1] 5
...
Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList()))
        .elapsed()
        .parallel()
        .runOn(Schedulers.parallel())
        .subscribe(t2 -> {
            System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
            try {
                Thread.sleep(1000); // long operation
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, throwable -> {
            System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
        }, () -> {
            System.out.println("[" + Thread.currentThread().getName() + "] Complete");
        }, subscription -> {
            System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
            subscription.request(Long.MAX_VALUE);
        });
[parallel-1] [8,0]
[parallel-2] [0,1]
[parallel-3] [0,2]
[parallel-4] [0,3]
[parallel-1] [0,4]
...

因此它只使用很少的线程来处理结果。在我看来,这确实是平行的。

还要注意,如果使用方法.subscribe(subscriber s),所有结果都将以顺序方式使用,若要并行使用它们,应使用:

public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable>
            onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe)

或任何其他具有使用者<的重载方法?超级t>onNext,...参数

 类似资料:
  • 我在使用R的group_by和SUMMARY函数时遇到了一些问题,我想知道你们是否可以帮我一些忙。我有一张类似的表格: 我试图使用dplyr的group_by和SUMMARY来找到频率列的平均值。下面是我的示例代码: 我所期望的是,一个表格被吐出来,分解按单个类别分组的平均频率,如下所示: 但是,我收到的是一个按类别分组的表,每个类别接收整个表的平均值,如下所示: 有什么线索吗?我应该说我是初学者

  • 我已经配置了log4j2.xml文件,application.log文件将被创建,它应该每天翻转。 但是在JVM中,applicatoin.log文件在10MB之后会翻转,如果翻转三次,第一个文件会被覆盖。也就是说我随时都application.logapplication-2020-10-16.log.zip. 为什么log4j2(v2.13)即使配置为每日,也会每10MB滚动一次文件?任何在l

  • 我有一个用例,我需要使用Kafka进行批处理。假设在1分钟内有大约100个请求,我不想立即发布每个请求,而是想将所有100个请求分批发布到topic一次。 但是使用以下配置,批处理不会发生,一旦发送消息,它就会发布到主题并同时在消费者中接收 生产者配置 消费者配置 在这里,我设置了 linger.ms = 60000,根据我的理解,如果 linger.ms 设置为某个值,那么即使发送方线程更早变得

  • 根据以下文档:http://testng.org/doc/documentation-main.html “如果测试类上有一个方法tearDown(),那么在每个测试方法之前和之后都会调用它”(JUnit 3) 我使用的是JUnit版本:3.8.1 这里的这个是在我的测试之前运行的,但我希望它会在测试之后。它在测试后没有运行: 我可以通过将第一行更改为: 但是,下一个类(下面)的表示法与第一个类完

  • 以下switch语句具有奇怪的行为: 我认为,当cat命令在被执行时失败时,“after cat”将被写入,而||之后的部分将被执行。但是,当我查看输出时,似乎在回显“后猫”后会发生中断,因此实际状态不会改变,将再次进入。然后stty也会失败(因为串行适配器丢失)。之后,cat命令againt在开始时失败,但现在进入“catch”块。。。。 下面是相关的输出: 我做错了什么?

  • 现在,在我的drools项目中,我在单独的DRL文件中有两组规则,它们由议程组分割。对于议程组“preCheck”,我将该议程组中的每个规则的自动聚焦设置为true。例子: 对于另一个议程组-“default规则”-规则没有设置自动焦点属性。示例: 在通过RESTAPI调用规则时,我还试图通过JSON负载将焦点设置为“preCheck”议程组。例子: 然而,在执行规则时,似乎首先要评估“defau