当前位置: 首页 > 知识库问答 >
问题:

RXJava-为什么执行器只使用一个线程

华泽语
2023-03-14

提前谢谢你

public static void main(String[] args) throws InterruptedException {
    long sleepTime = 1000;
    ExecutorService e = Executors.newFixedThreadPool(3);

    Observable.interval(300, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.computation())
    .flatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long pT) {
            return Observable.just(pT).subscribeOn(Schedulers.from(e));
        }
    })
    .doOnNext(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    })
    .subscribe(new Action1<Long>() {

        @Override
        public void call(Long pT) {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    });


    Thread.sleep(50000);
    e.shutdownNow();

}

日志

i am 0in thread:pool-1-thread-1
i am 1in thread:pool-1-thread-1
i am 2in thread:pool-1-thread-1
i am 3in thread:pool-1-thread-1
i am 4in thread:pool-1-thread-1
i am 5in thread:pool-1-thread-1
i am 6in thread:pool-1-thread-1
i am 7in thread:pool-1-thread-1
i am 8in thread:pool-1-thread-1
i am 9in thread:pool-1-thread-1
i am 10in thread:pool-1-thread-1
i am 11in thread:pool-1-thread-1

共有1个答案

段干子晋
2023-03-14

根据我在你的代码中的理解,生产者生产的速度比订户快。然而,可观察的 间隔(long interval,TimeUnit unit) 实际上不支持反压。文件指出

此操作符不支持背压,因为它使用时间。如果下游需要更慢的时间,它应该降低计时器的速度,或者使用类似{@link#onbackpressuredrop}的方法

如果您的处理确实比生产者慢,那么您可以在订阅者代码中执行如下操作

.subscribe(new Action1<Long>() {

    @Override
    public void call(Long pT) {
        e.submit(new Runnable() {
            System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName());

        }
    }
});
 类似资料:
  • 我发现这样的php代码: 我希望这个循环会执行4次,因为$I变成了对$的引用(对吗?)。然而,循环只执行一次,并输出: a=10,i=10 我不明白为什么它会这样工作。有什么想法吗?

  • 我编写了代码示例: 每100毫秒提交一个新任务(总任务量-20)。每个任务持续时间-0.5秒。因此,可以并行执行5个任务,最佳执行时间为:20*100 500=2.5秒,池应创建5个线程 但我的实验显示为9.6秒。我打开jsvisualvm查看池创建了多少线程,我看到只创建了一个线程: 请更正我的线程池配置不正确的地方。

  • 我使用的房间用于存储有关购物车详细信息。 我想删除主线程中的记录。我出错了 无法访问主线程上的数据库,因为它可能会长时间锁定 我检查了下面的链接,但没有帮助我 使用room(rxjava)执行删除 如何在android room和rxjava 2中插入数据并获取id作为out参数? 道阿ccess.java 应用班 我的异步任务正在工作 如何在RxJava中实现这一点。因为我的DAO删除返回无效?

  • 我是RxJava的新手,正在尝试从link执行多个观测值的并行执行示例:RxJava并行获取观测值 虽然上面链接中提供的示例是并行执行可观察对象,但是当我在foreach方法中添加一个Thread.sleep(TIME_IN_MILLISECONDS)时,系统开始一次执行一个可观察对象。请帮助我理解为什么Thread.sleep停止可观察对象的并行执行。 下面是导致多个观测值同步执行的修改示例:

  • 有人能解释一下为什么下面的测试失败了吗? 我试图进一步简化“不好”的观察结果,但找不到任何可以删除的东西来简化它。 然而,我目前的理解是,它是一个可观察的(不管它是如何构造的),应该发出一个值,然后完成。然后,我们基于该可观察对象制作两个类似的对象实例,并在那些使用可观察对象的对象上调用一个方法,记下已经这样做了,然后返回Observable.empty()。 有人能解释为什么使用这个可观察的会导

  • 我正在使用最新的Java8 lambdas和并行流处理数据。我的代码如下: 流以一个元素开始,但在第二阶段会添加更多的元素。我的假设是这个流应该并行运行,但在这种情况下只使用一个工作线程。 如果我从2个元素开始(即我在初始列表中添加第二个元素),那么就会产生2个线程来处理流,依此类推...如果我没有显式地将流提交给ForkJoinPool,也会发生这种情况。 问题是:它是记录在案的行为还是可能在实