当前位置: 首页 > 知识库问答 >
问题:

执行者:如果任务是递归创建的,如何同步等待所有任务完成?

暴夕
2023-03-14

我的问题与这里的问题密切相关。正如在那里发布的,我希望主线程等到工作队列为空并且所有任务都完成。然而,我的问题是,每个任务都可能递归地导致新任务被提交处理。这使得收集所有这些任务的未来有点尴尬。

我们当前的解决方案使用忙等待循环来等待终止:

        do { //Wait until we are done the processing
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    } while (!executor.getQueue().isEmpty()
             || numTasks.longValue() > executor.getCompletedTaskCount());

numTasks是一个随着每个新任务的创建而增加的值。这很管用,但我觉得因为等待时间太长,所以不太好。我想知道是否有一种好方法可以让主线程同步等待,直到被显式唤醒。

共有3个答案

尹臻
2023-03-14

这是一个非常有趣的问题。我必须警告,我还没有完全测试代码。

这样做的目的是简单地跟踪任务的执行

  • 如果任务成功排队,计数器将递增一

当调用shutdown并且存在挂起的任务时,委托不会对实际的Executor服务调用shutdown。它将允许对新任务进行排队,直到挂起的任务计数达到零,并在实际ExecutorService上调用shutdown。

public class ResilientExecutorServiceDelegate implements ExecutorService {
    private final ExecutorService executorService;
    private final AtomicInteger pendingTasks;
    private final Lock readLock;
    private final Lock writeLock;
    private boolean isShutdown;

    public ResilientExecutorServiceDelegate(ExecutorService executorService) {
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.pendingTasks = new AtomicInteger();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        this.executorService = executorService;
        this.isShutdown = false;
    }

    private <T> T addTask(Callable<T> task) {
        T result;
        boolean success = false;
        // Increment pending tasks counter
        incrementPendingTaskCount();
        try {
            // Call service
            result = task.call();
            success = true;
        } catch (RuntimeException exception) {
            throw exception;
        } catch (Exception exception) {
            throw new RejectedExecutionException(exception);
        } finally {
            if (!success) {
                // Decrement pending tasks counter
                decrementPendingTaskCount();
            }
        }
        return result;
    }

    private void incrementPendingTaskCount() {
        pendingTasks.incrementAndGet();
    }

    private void decrementPendingTaskCount() {
        readLock.lock();
        if (pendingTasks.decrementAndGet() == 0 && isShutdown) {
            try {
                // Shutdown
                executorService.shutdown();
            } catch (Throwable throwable) {
            }
        }
        readLock.unlock();
    }

    @Override
    public void execute(final Runnable task) {
        // Add task
        addTask(new Callable<Object>() {
            @Override
            public Object call() {
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            task.run();
                        } finally {
                            decrementPendingTaskCount();
                        }
                    }
                });
                return null;
            }
        });
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        // Call service
        return executorService.awaitTermination(timeout, unit);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> List<Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
            long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }

    @Override
    public boolean isTerminated() {
        return executorService.isTerminated();
    }

    @Override
    public void shutdown() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        try {
            if (pendingTasks.get() == 0) {
                // Real shutdown
                executorService.shutdown();
            }
        } finally {
            // Unlock write lock
            writeLock.unlock();
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        // Unlock write lock
        writeLock.unlock();

        return executorService.shutdownNow();
    }

    @Override
    public <T> Future<T> submit(final Callable<T> task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(
                        executorService.submit(new Callable<T>() {
                            @Override
                            public T call() throws Exception {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    return task.call();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public Future<?> submit(final Runnable task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<?>>() {
            @Override
            @SuppressWarnings("unchecked")
            public Future<?> call() {
                return new FutureDelegate<Object>(
                        (Future<Object>) executorService.submit(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public <T> Future<T> submit(final Runnable task, final T result) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(executorService.submit(
                        new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }, result), futureExecutionStatus);
            }
        });
    }

    private class FutureExecutionStatus {
        private volatile boolean executed;

        public FutureExecutionStatus() {
            executed = false;
        }

        public void setExecuted() {
            executed = true;
        }

        public boolean isExecuted() {
            return executed;
        }
    }

    private class FutureDelegate<T> implements Future<T> {
        private Future<T> future;
        private FutureExecutionStatus executionStatus;

        public FutureDelegate(Future<T> future,
                FutureExecutionStatus executionStatus) {
            this.future = future;
            this.executionStatus = executionStatus;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = future.cancel(mayInterruptIfRunning);
            if (cancelled) {
                // Lock read lock
                readLock.lock();
                // If task was not executed
                if (!executionStatus.isExecuted()) {
                    decrementPendingTaskCount();
                }
                // Unlock read lock
                readLock.unlock();
            }
            return cancelled;
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            return future.get();
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            return future.get(timeout, unit);
        }

        @Override
        public boolean isCancelled() {
            return future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return future.isDone();
        }
    }
}
司寇阳曦
2023-03-14

Java7提供了一个适合这个用例的同步器,称为Phaser。它是Countdownlock和CyclicBarrier的可重用混合,既可以增加也可以减少注册方的数量(类似于可增加的Countdownlock)。

在这个场景中使用移相器的基本模式是在创建时向移相器注册任务,并在完成时到达。当到达方的数量与注册方的数量匹配时,移相器“前进”到下一个阶段,在前进发生时通知任何等待的线程

这是我创建的等待递归任务完成的示例。它天真地找到斐波那契序列的前几个数字以供演示:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;

/**
 * An example of using a Phaser to wait for the completion of recursive tasks.
 * @author Voxelot
 */
public class PhaserExample {
    /** Workstealing threadpool with reduced queue contention. */
    private static ForkJoinPool executors;

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws InterruptedException {
        executors = new ForkJoinPool();
        List<Long> sequence = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            sequence.add(fib(i));
        }
        System.out.println(sequence);
    }

    /**
     * Computes the nth Fibonacci number in the Fibonacci sequence.
     * @param n The index of the Fibonacci number to compute
     * @return The computed Fibonacci number
     */
    private static Long fib(int n) throws InterruptedException {
        AtomicLong result = new AtomicLong();
        //Flexible sychronization barrier
        Phaser phaser = new Phaser();
        //Base task
        Task initialTask = new Task(n, result, phaser);
        //Register fib(n) calling thread
        phaser.register();
        //Submit base task
        executors.submit(initialTask);
        //Make the calling thread arrive at the synchronization
        //barrier and wait for all future tasks to arrive.
        phaser.arriveAndAwaitAdvance();
        //Get the result of the parallel computation.
        return result.get();
    }

    private static class Task implements Runnable {
        /** The Fibonacci sequence index of this task. */
        private final int index;
        /** The shared result of the computation. */
        private final AtomicLong result;
        /** The synchronizer. */
        private final Phaser phaser;

        public Task(int n, AtomicLong result, Phaser phaser) {
            index = n;
            this.result = result;
            this.phaser = phaser;
            //Inform synchronizer of additional work to complete.
            phaser.register();
        }

        @Override
        public void run() {
            if (index == 1) {
                result.incrementAndGet();
            } else if (index > 1) {
                //recurrence relation: Fn = Fn-1 + Fn-2
                Task task1 = new Task(index - 1, result, phaser);
                Task task2 = new Task(index - 2, result, phaser);
                executors.submit(task1);
                executors.submit(task2);
            }
            //Notify synchronizer of task completion.
            phaser.arrive();
        }
    }
}
巫马磊
2023-03-14

非常感谢你的建议!

最后,我选择了一些我认为相当简单的东西。我发现倒计时锁几乎就是我需要的。它会一直阻塞,直到计数器达到0。唯一的问题是,它只能倒计时,不能倒计时,因此在任务可以提交新任务的动态环境中不起作用。因此,我实现了一个新类CountLatch,它提供了额外的功能。(见下文)我使用的这个类如下。

主线程调用锁存器。等待零(),直到闩锁达到0为止。

任何线程,在调用executor.execute(...)之前调用latch.increment()

任何任务在完成之前都会调用latch。减量()

当最后一个任务终止时,计数器将达到0,从而释放主线程。

欢迎进一步的建议和反馈!

public class CountLatch {

@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected int acquireNonBlocking(int acquires) {
        // increment count
        for (;;) {
            int c = getState();
            int nextc = c + 1;
            if (compareAndSetState(c, nextc))
                return 1;
        }
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public CountLatch(int count) {
    this.sync = new Sync(count);
}

public void awaitZero() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void increment() {
    sync.acquireNonBlocking(1);
}

public void decrement() {
    sync.releaseShared(1);
}

public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

}

请注意,增量()/减量()调用可以按照Sami Korhonen的建议封装到自定义的Execitor子类中,或者按照impl的建议使用beforeExecutepostExecute。请参阅此处:

public class CountingThreadPoolExecutor extends ThreadPoolExecutor {

protected final CountLatch numRunningTasks = new CountLatch(0);

public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
    numRunningTasks.increment();
    super.execute(command);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
    numRunningTasks.decrement();
    super.afterExecute(r, t);
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion() throws InterruptedException {
    numRunningTasks.awaitZero();
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
    numRunningTasks.awaitZero(timeout, unit);
}

}
 类似资料:
  • 问题内容: 我的问题与这里的这个问题密切相关。如此处所述,我希望主线程等待,直到工作队列为空并且所有任务都已完成。但是,我的情况是每个任务都可能递归地导致新任务被提交进行处理。这使得收集所有这些任务的未来变得有点尴尬。 我们当前的解决方案使用忙等待循环来等待终止: numTasks是随着创建每个新任务而增加的值。这可以工作,但是由于繁忙的等待,我认为它不是很好。我想知道是否有一个好方法可以使主线程

  • 问题内容: 等待所有任务完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的作业-每个内核上一个。现在,我的设置如下所示: 实现可运行。这似乎是正确执行的任务,但代码崩溃上用。这很奇怪,因为我玩了一些玩具示例,而且看起来很奏效。 包含数以万计的元素。我应该使用其他方法吗?我正在寻找尽可能简单的东西 问题答案: 最简单的方法是使用单行代码执行所需的操作。用你的话来说,你需要修改或包装以实

  • 我正在运行的服务器应用程序获取我要使用任务系统处理的多个任务请求。 每个任务都表示为一个可运行的线程池,该线程池中的线程数小于或等于线程池的大小,需要线程池中的线程数。当然,线程池是必要的,以避免CPU因线程过多而过载。 然而,其中一些任务可以是多线程的,而另一些则不能。这就是为什么一个任务可能需要等待其所有特定线程完成,以便合并这些线程的结果以获得最终结果的原因。 如果使用多个实例,则可以像这样

  • 我正在使用Spring 4.3.8。发布Java7。我想创建一个线程工厂来帮助管理应用程序中的某些工作人员。我像这样声明我的线程工厂 但是,我在线程上“加入”有困难。也就是说,我想在继续某个任务之前等待所有工作完成,所以我有 我的线程池是这样执行的 然而打印出来的是 所以很明显我没有等待。等待我的线程完成工作的正确方法是什么?

  • 问题内容: 我使用来执行任务。该任务可以递归创建提交给同一任务的其他任务,那些子任务也可以做到这一点。 我现在遇到的问题是,我要等到所有任务都完成(即所有任务都已完成并且它们没有提交新任务)后再继续。 我无法在主线程中调用,因为这会阻止接受新任务。 如果没有被呼叫,呼叫似乎无能为力。 所以我有点卡在这里。看到所有工人都闲着不难,不是吗?我能想到的唯一优雅的解决方案是直接使用a 并偶尔查询一次。真的

  • 问题内容: 我只希望我的主线程在退出之前等待我所有的(p)线程完成。 线程来回移动的原因很多,我真的不想跟踪所有线程-我只想知道线程什么时候消失了。 wait()对子进程执行此操作,在没有子进程时返回ECHILD,但是wait(似乎不与(p)个线程一起工作)。 我真的不想麻烦保留每个未完成线程的列表(随它们来来去去),然后必须在每个线程上调用pthread_join。 有没有一种快速而又肮脏的方式