当前位置: 首页 > 面试题库 >

ForkJoinPool并行度= 1死锁

窦啸
2023-03-14
问题内容

我正在使用jsr166y ForkJoinPool在线程之间分配计算任务。但是我显然一定做错了。

如果创建并行度>
1(默认值为Runtime.availableProcessors();我一直在运行2-8个线程)的ForkJoinPool,我的任务就可以正常工作。但是,如果我创建并行度=
1的ForkJoinPool,则在无法预测的迭代次数后会看到死锁。

是的-设置并行度=
1是一种奇怪的做法。在这种情况下,我要对线程数增加时的并行算法进行性能分析,并且我想将与单个线程一起运行的并行版本与基准串行实现进行比较,以便准确地确定并行实现的开销。

下面是一个简单的示例,它说明了我所遇到的问题。“任务”是在固定数组上进行的虚拟迭代,递归分为16个子任务。

如果以THREADS = 2(或更高)运行,它将可靠地运行到完成,但是如果以THREADS =
1运行,则将始终死锁。经过不可预测的迭代次数后,主循环挂在ForkJoinPool.invoke()中,等待task.join(),然后退出工作线程。

我在Linux下使用JDK 1.6.0_21和1.6.0_22运行,并使用了几天前从Doug
Lea的网站(http://gee.cs.oswego.edu/dl/concurrency-
interest/下载的jsr166y版本index.html)

关于我所缺少的任何建议吗?提前谢谢了。

package concurrent;

import jsr166y.ForkJoinPool;
import jsr166y.RecursiveAction;

public class TestFjDeadlock {

    private final static int[] intArray = new int[256 * 1024];
    private final static float[] floatArray = new float[256 * 1024];

    private final static int THREADS = 1;
    private final static int TASKS = 16;
    private final static int ITERATIONS = 10000;

    public static void main(String[] args) {

        // Initialize the array
        for (int i = 0; i < intArray.length; i++) {
            intArray[i] = i;
        }

        ForkJoinPool pool = new ForkJoinPool(THREADS);

        // Run through ITERATIONS loops, subdividing the iteration into TASKS F-J subtasks
        for (int i = 0; i < ITERATIONS; i++) {
            pool.invoke(new RecursiveIterate(0, intArray.length));
        }

        pool.shutdown();
    }

    private static class RecursiveIterate extends RecursiveAction {

        final int start;
        final int end;

        public RecursiveIterate(final int start, final int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {

            if ((end - start) <= (intArray.length / TASKS)) {
                // We've reached the subdivision limit - iterate over the arrays
                for (int i = start; i < end; i += 3) {
                    floatArray[i] += i + intArray[i];
                }

            } else {
                // Subdivide and start new tasks
                final int mid = (start + end) >>> 1;
                invokeAll(new RecursiveIterate(start, mid), new RecursiveIterate(mid, end));
            }
        }
    }
}

问题答案:

看起来像ForkJoinPool中的错误。我在该类用法中看到的所有内容都适合您的示例。唯一的其他可能性可能是您的任务之一引发异常并异常死亡(尽管仍应处理)。



 类似资料:
  • 我试图理解Flink中的并行是如何工作的。本文件https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html似乎表明水槽的平行度等于1。在我的例子中,我正在向我的接收器中的HBase写信——这是否意味着只有一个任务(线程?)哪个将写入HBase?它是否没有为应用程序设置全局并行

  • 我对和的内部调度机制有点困惑。 同时,显示为不同的,因为它使用了工作窃取算法。如果我理解正确,它意味着一个线程可以从另一个线程窃取一些任务。 然而,我并不真正理解和中实现的机制之间的区别。从我的理解来看,两种机制都应该尽可能减少每个线程的空闲时间。 如果在的情况下,每个线程都有自己的队列,我会理解的。然而,情况并非如此,因为队列是由池的不同线程共享的。

  • 我有一个玩具Flink工作,从3个Kafka主题中读取,然后联合所有这3个流。仅此而已,没有额外的工作。 如果在我的Flink工作中使用parallelism 1,只要我更改parallelism,一切都会很好 为什么它适用于并行1,但不适用于并行 是否与Kafka服务器端设置有关?或者它与我的java代码中的comsumer设置有关(我的代码中还没有特殊的配置)? 我知道这里提供的信息可能不够充

  • 我正在比较算法(前n个数之和)的顺序和并行性能(使用ForkJoinPool): 我试着用不同的NumLoop来获得不同的值,但顺序法总是表现得更好,而且也是按3-4的顺序。 考虑到阵列大小并不是那么小,并行版本在这里的性能不应该更好吗。

  • 在Java8中,可以设置一个定制的forkJoinPool供并行流使用,而不是公共池。 我的问题是它在技术上是如何发生的? 流以任何方式都不知道它被提交给了自定义的forkJoinpool并且没有直接访问它的权限。那么最终如何使用正确的线程来处理流的任务呢? 我试着看源代码,但没有用。我的最佳猜测是在提交时的某个点设置了某个threadLocal变量,然后在稍后由流使用。如果是这样的话,为什么语言