当前位置: 首页 > 面试题库 >

我可以使用ForkJoinPool的工作窃取行为来避免线程饥饿死锁吗?

叶衡虑
2023-03-14
问题内容

一个 线程死锁饥饿 如果池中的所有线程都在等待在同一池中,以完成队列任务发生在一个正常的线程池。
ForkJoinPool通过从join()调用内部的其他线程中窃取工作来避免此问题,而不仅仅是等待。例如:

private static class ForkableTask extends RecursiveTask<Integer> {
    private final CyclicBarrier barrier;

    ForkableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    protected Integer compute() {
        try {
            barrier.await();
            return 1;
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
}

@Test
public void testForkJoinPool() throws Exception {
    final int parallelism = 4;
    final ForkJoinPool pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
    for (int i = 0; i < parallelism; ++i) {
        forkableTasks.add(new ForkableTask(barrier));
    }

    int result = pool.invoke(new RecursiveTask<Integer>() {
        @Override
        protected Integer compute() {
            for (ForkableTask task : forkableTasks) {
                task.fork();
            }

            int result = 0;
            for (ForkableTask task : forkableTasks) {
                result += task.join();
            }
            return result;
        }
    });
    assertThat(result, equalTo(parallelism));
}

但是,使用到的ExecutorService接口时ForkJoinPool,似乎不会发生窃取工作的情况。例如:

private static class CallableTask implements Callable<Integer> {
    private final CyclicBarrier barrier;

    CallableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public Integer call() throws Exception {
        barrier.await();
        return 1;
    }
}

@Test
public void testWorkStealing() throws Exception {
    final int parallelism = 4;
    final ExecutorService pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
    int result = pool.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            int result = 0;
            // Deadlock in invokeAll(), rather than stealing work
            for (Future<Integer> future : pool.invokeAll(callableTasks)) {
                result += future.get();
            }
            return result;
        }
    }).get();
    assertThat(result, equalTo(parallelism));
}

粗略地看一下ForkJoinPool的实现,所有常规ExecutorServiceAPI都是使用ForkJoinTasks
实现的,因此我不确定为什么会发生死锁。


问题答案:

您几乎要回答自己的问题。解决方案是声明
ForkJoinPool通过从join()调用内部的其他线程中窃取工作来避免此问题”。只要线程由于某些其他原因而被阻塞(除外)ForkJoinPool.join(),就不会发生窃取工作的情况,线程只是等待而什么也不做。

原因是在Java中,无法ForkJoinPool阻止其线程阻塞,而是给它们提供其他处理方法。线程 本身
需要避免阻塞,而是要求池进行应做的工作。并且这仅在该ForkJoinTask.join()方法中实现,而不能在任何其他阻塞方法中实现。如果在Future内部使用ForkJoinPool,您还将看到饥饿死锁。

为什么仅ForkJoinTask.join()在Java
API中的任何其他阻止方法中而不是在其他方法中实现工作窃取?嗯,有很多这样的阻塞方法(Object.wait()Future.get()中的任何并发原语java.util.concurrent,I
/
O方法等),它们与无关ForkJoinPool,后者只是API中的任意类,因此在所有情况下都添加了特殊情况这些方法将是不好的设计。这也可能导致非常令人惊讶和不希望的后果。假设有一个用户将一个任务传递给一个ExecutorService等待一个的Future,然后发现该任务挂起的时间很长,Future.get()原因是正在运行的线程偷走了其他一些(长时间运行的)工作项,而不是等待Future并在结果可用后立即继续。一旦线程开始处理另一个任务,它就不能返回到原始任务,直到第二个任务完成为止。因此,其他阻止方法不进行工作窃取实际上是一件好事。对于a
ForkJoinTask,这个问题不存在,因为不重要的是尽快继续执行主要任务,因此重要的是尽可能高效地共同处理所有任务。

也无法实现您自己的方法来在内进行偷窃工作ForkJoinPool,因为所有相关部分都不公开。

但是,实际上还有第二种方法可以防止饥饿死锁。这称为 托管阻止
。它不使用工作窃取(以避免上面提到的问题),但是还需要将要阻塞的线程与线程池进行积极协作。使用托管阻塞,线程告诉线程池它可能在被阻塞 之前
它调用潜在的阻塞方法,并在阻塞方法完成时通知池。然后,线程池知道存在饥饿死锁的风险,并且如果其所有线程当前都处于某个阻塞操作中并且还有其他任务要执行,则可能会产生其他线程。请注意,由于附加线程的开销,这效率不如工作窃取。如果您使用普通期货和托管分块来实现递归并行算法,而不是使用ForkJoinTask加上工作偷窃,额外线程的数量可能会非常大(因为在算法的“划分”阶段,将创建许多任务并将其分配给立即阻塞并等待子任务结果的线程)。但是,仍然可以防止出现饥饿死锁,并且避免了一个任务必须等待很长时间的问题,因为该任务的线程同时开始处理另一个任务。

ForkJoinPoolJava的支持托管阻塞。为了使用它,需要实现一个接口ForkJoinPool.ManagedBlocker,以便从block该接口的方法中调用任务要执行的潜在阻塞方法。然后,任务可能不会直接调用阻塞方法,而是需要调用static方法ForkJoinPool.managedBlock(ManagedBlocker)。此方法在阻塞之前和之后处理与线程池的通信。如果当前任务未在内执行ForkJoinPool,那么它也可以工作,它仅调用阻塞方法。

我在Java API(适用于Java 7)中找到的唯一实际使用托管阻塞的地方是class
Phaser。(此类是像互斥锁和闩锁一样的同步屏障,但更灵活,更强大。)因此,与任务Phaser内部同步ForkJoinPool时应使用托管阻塞,并且可以避免饥饿死锁(但ForkJoinTask.join()仍可取,因为它使用工作窃取而不是托管阻塞)
。无论您是ForkJoinPool直接使用还是通过其ExecutorService界面使用,此方法都有效。但是,如果您使用其他任何ExecutorService类(例如由类创建的类)Executors,则将无法使用,因为它们不支持托管阻塞。

在Scala中,托管阻塞的使用更加广泛(description,API)。



 类似资料:
  • 主要内容:死锁,活锁,饥饿,总结本节我们来介绍一下死锁、活锁和饥饿这三个概念。 死锁 死锁是指两个或两个以上的进程(或线程)在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。 死锁发生的条件有如下几种: 1) 互斥条件 线程对资源的访问是排他性的,如果一个线程对占用了某资源,那么其他线程必须处于等待状态,直到该资源

  • 问题内容: 该代码实际上是从Java并发中获取的,根据作者的说法,这里发生了“ ThreadStarvtionDeadlock”。请帮我找到ThreadStarvationDeadlock在这里和哪里发生的情况吗?提前致谢。 问题答案: 死锁和饥饿发生在以下行: 怎么样? 如果我们在程序中添加一些额外的代码,它将发生。可能是这样的: 导致死锁的步骤: 通过实现的类将任务提交给渲染页面。 开始在单独

  • 本文向大家介绍什么是线程死锁?如何避免死锁?相关面试题,主要包含被问及什么是线程死锁?如何避免死锁?时的应答技巧和注意事项,需要的朋友参考一下 认识线程死锁 多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。 如下图所示,线程 A 持有资源 2,线程 B 持有资源 1,他们同时都想申请对方的资源,所以这两个线程就会互相等待而进入死锁状态

  • 主要内容:示例资源分配图是系统状态的图形表示。 顾名思义,资源分配图是关于持有一些资源或等待某些资源的所有进程的完整信息。 它还包含有关所有资源的所有实例的信息,无论这些资源是否可用或正在被进程使用。 在资源分配图中,进程由圆形表示,而资源由矩形表示。 我们来详细看看顶点和边的类型。 顶点主要有两种类型,资源和过程。 它们中的每一个将以不同的形状表示。 Circle代表进程,而矩形代表资源。 一个资源可以有多个

  • 在避免死锁的情况下,如果系统的结果状态不会导致系统中的死锁,那么将会授予对任何资源的请求。系统的状态将持续检查安全和不安全的状态。 为了避免死锁,进程必须告诉OS,进程可以请求完成其执行的最大资源数量。 最简单和最有用的方法指出,流程应声明它可能需要的每种类型的最大资源数量。 死锁避免算法检查资源分配,以便永远不会有循环等待条件。 安全和不安全的状态 系统的资源分配状态可以由可用资源和已分配资源的

  • 我对和使用了RXJava。我知道他们不是在一条流上并行化排放。换句话说,单个排放流只会放在一个线程上,对吗?我下面的测试似乎表明了这一点。我的理解是,您还必须调度器,如,以在单个流上并行化发射。 同样,如果是这种情况,那么调度器会发生线程饥饿吗?如果我的计算调度程序有5个线程,但我有超过5个长时间运行的异步流正在处理,有没有可能出现饥饿?或者这不太可能仅仅是因为RXJava的性质?