当前位置: 首页 > 知识库问答 >
问题:

为什么在嵌套流中使用computation()调度器会导致死锁?

须巴英
2023-03-14

最近我在写一些复杂的基于RX的流程,发现它总是在特定情况下产生死锁。我花了几个小时才找出问题所在,似乎可以在这个简单的示例中重现:

public static void main(String[] args) throws InterruptedException {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
        .flatMap(x -> Observable.fromIterable(produceMultiple(x)))
        .subscribeOn(Schedulers.computation())
        .subscribe(System.out::println);

    Thread.sleep(50_000);
}

private static List<Integer> produceMultiple(int x) {
    return Observable.range(1, x)
        .flatMap(y -> Observable.fromCallable(() -> {
                Thread.sleep(1000);
                return 10 * x + y;
            }).subscribeOn(Schedulers.computation())
        ).toList().blockingGet();
}

此程序应打印以下值:11、21、22、31、32、33、。。。,通常,值可以表示为XY。每组X中的值的顺序可以是随机的,但组的顺序应该是升序。如果previous仍在计算,则不应发出新组(这是我的原始情况)。

问题是,如果您运行这段代码,您将只看到前几个元素的输出-我认为它与处理器的数量相关,因为computation()调度器使用固定的线程池大小。

你知道为什么会这样吗?这看起来很奇怪,因为主链的数字(1,2,3,…)在池中的单个线程上处理,并且每次produceMultiple使用blockingGet()完成其作业时,其他线程都应该是空闲的。

如果您修改任何提供不同调度器的subscribeOn()(或者一个是computation(),另一个不同),一切正常。此外,如果我从自定义线程池执行器(绑定到4个线程)创建调度程序,它仍然可以工作!

共有1个答案

阴飞星
2023-03-14

不要在计算调度器上使用blockingGet,因为您可能会阻止代码其他部分使用的支持线程。您甚至不必在代码中使用它,只需从produceMultiple返回可观察的:

static void main(String[] args) throws InterruptedException {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
        .flatMap(x -> produceMultiple(x))
        .subscribeOn(Schedulers.computation())
        .subscribe(System.out::println);

    Thread.sleep(50_000);
}

static Observable<Integer> produceMultiple(int x) {
    return Observable.range(1, x)
        .flatMap(y -> Observable.fromCallable(() -> {
                Thread.sleep(1000);
                return 10 * x + y;
            }).subscribeOn(Schedulers.computation())
        );
}
 类似资料:
  • 请问上述代码的第70行换成第71行注释的内容时,为什么会造成死循环。 题目: https://www.acwing.com/problem/content/174/

  • 我遇到了一个奇怪的情况,在静态初始化器中使用带有lambda的并行流似乎永远不会占用CPU。代码如下: 这似乎是该行为的最小再现测试用例。如果我: null 我使用的是OpenJDK版本1.8.0_66-internal。

  • 问题内容: 我敢肯定对这种琐碎的情况有一个简单的解释,但是我对并发模型是陌生的。 当我运行这个例子 我收到此错误: 为什么呢 包装成一个使示例按预期运行 再次,为什么? 请,我需要深入的解释,而不仅仅是如何消除死锁并修复代码。 问题答案: 从文档中: 如果通道未缓冲,则发送方将阻塞,直到接收方收到该值为止。如果通道具有缓冲区,则发送方仅阻塞该值,直到将值复制到该缓冲区为止;否则,发送方才阻塞。如果

  • 问题内容: 注意: 这是不是重复,请仔细阅读题目 сarefully报价: 真正的问题是为什么代码有时在不应该运行的情况下仍然有效。即使没有lambda,该问题也会重现。这使我认为可能存在JVM错误。 在http://codingdict.com/questions/122889的评论中,我试图找出原因,导致代码行为从一个起点到另一个起点有所不同,而该讨论的参与者为我提供了一个建议,以创建一个单独

  • 问题内容: 什么时候嵌套类可行?我看到的最常见的优点是“共享范围”(跨类使用变量)。 这是否比仅将嵌套类放入其自己的文件中并通过构造函数传递参数更具吸引力/最佳实践少? 问题答案: 使用嵌套类的原因有很多,其中包括: 这是一种对仅在一个地方使用的类进行 逻辑分组 的方法。 它增加了 封装 。 嵌套类可以导致更具 可读性和可维护性的代码 。 子级到父级的连接更为简单,因为它 直观地说明 了每个类的变

  • 问题内容: 好吧,我试图理解并阅读可能导致它的原因,但我却无法理解: 我的代码中有这个地方: 事实是,当它尝试调用某些方法时,它将引发而不是其他预期的异常(特别是)抛出 。我实际上知道调用了什么方法,所以我直接转到该方法代码,并为应该抛出的行添加了一个块 ,它实际上按预期抛出。然而,当它上升时,以某种方式更改了上面的代码并没有 按预期进行。 是什么原因导致这种行为的?我该如何检查? 问题答案: 通