我的问题与这里的这个问题密切相关。如此处所述,我希望主线程等待,直到工作队列为空并且所有任务都已完成。但是,我的情况是每个任务都可能递归地导致新任务被提交进行处理。这使得收集所有这些任务的未来变得有点尴尬。
我们当前的解决方案使用忙等待循环来等待终止:
do { //Wait until we are done the processing
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (!executor.getQueue().isEmpty()
|| numTasks.longValue() > executor.getCompletedTaskCount());
numTasks是随着创建每个新任务而增加的值。这可以工作,但是由于繁忙的等待,我认为它不是很好。我想知道是否有一个好方法可以使主线程同步等待,直到被显式唤醒。
非常感谢您的所有建议!
最后,我选择了一些我认为相当简单的东西。我发现CountDownLatch几乎是我所需要的。它一直阻塞直到计数器达到0。唯一的问题是它只能递减计数,不能递减计数,因此不能在我有任务可以提交新任务的动态设置中使用。因此,我实现了一个CountLatch
提供附加功能的新类。(请参阅下文)然后,按如下方式使用此类。
主线程调用latch.awaitZero()
,直到锁存器到达0为止。
任何线程,在调用executor.execute(..)
calls 之前latch.increment()
。
在完成之前,任何任务都会调用latch.decrement()
。
当最后一个任务终止时,计数器将达到0,从而释放主线程。
欢迎进一步的建议和反馈!
public class CountLatch {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected int acquireNonBlocking(int acquires) {
// increment count
for (;;) {
int c = getState();
int nextc = c + 1;
if (compareAndSetState(c, nextc))
return 1;
}
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountLatch(int count) {
this.sync = new Sync(count);
}
public void awaitZero() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void increment() {
sync.acquireNonBlocking(1);
}
public void decrement() {
sync.releaseShared(1);
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
请注意,可以将increment()
/ decrement()
调用封装到自定义的Executor
子类中,如Sami Korhonen
所建议的那样,或者与beforeExecute
和afterExecute
一起由impl所建议的那样。看这里:
public class CountingThreadPoolExecutor extends ThreadPoolExecutor {
protected final CountLatch numRunningTasks = new CountLatch(0);
public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable command) {
numRunningTasks.increment();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
numRunningTasks.decrement();
super.afterExecute(r, t);
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion() throws InterruptedException {
numRunningTasks.awaitZero();
}
/**
* Awaits the completion of all spawned tasks.
*/
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
numRunningTasks.awaitZero(timeout, unit);
}
}
我的问题与这里的问题密切相关。正如在那里发布的,我希望主线程等到工作队列为空并且所有任务都完成。然而,我的问题是,每个任务都可能递归地导致新任务被提交处理。这使得收集所有这些任务的未来有点尴尬。 我们当前的解决方案使用忙等待循环来等待终止: numTasks是一个随着每个新任务的创建而增加的值。这很管用,但我觉得因为等待时间太长,所以不太好。我想知道是否有一种好方法可以让主线程同步等待,直到被显式
问题内容: 等待所有任务完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的作业-每个内核上一个。现在,我的设置如下所示: 实现可运行。这似乎是正确执行的任务,但代码崩溃上用。这很奇怪,因为我玩了一些玩具示例,而且看起来很奏效。 包含数以万计的元素。我应该使用其他方法吗?我正在寻找尽可能简单的东西 问题答案: 最简单的方法是使用单行代码执行所需的操作。用你的话来说,你需要修改或包装以实
我正在使用Spring 4.3.8。发布Java7。我想创建一个线程工厂来帮助管理应用程序中的某些工作人员。我像这样声明我的线程工厂 但是,我在线程上“加入”有困难。也就是说,我想在继续某个任务之前等待所有工作完成,所以我有 我的线程池是这样执行的 然而打印出来的是 所以很明显我没有等待。等待我的线程完成工作的正确方法是什么?
我正在运行的服务器应用程序获取我要使用任务系统处理的多个任务请求。 每个任务都表示为一个可运行的线程池,该线程池中的线程数小于或等于线程池的大小,需要线程池中的线程数。当然,线程池是必要的,以避免CPU因线程过多而过载。 然而,其中一些任务可以是多线程的,而另一些则不能。这就是为什么一个任务可能需要等待其所有特定线程完成,以便合并这些线程的结果以获得最终结果的原因。 如果使用多个实例,则可以像这样
问题内容: 我只希望我的主线程在退出之前等待我所有的(p)线程完成。 线程来回移动的原因很多,我真的不想跟踪所有线程-我只想知道线程什么时候消失了。 wait()对子进程执行此操作,在没有子进程时返回ECHILD,但是wait(似乎不与(p)个线程一起工作)。 我真的不想麻烦保留每个未完成线程的列表(随它们来来去去),然后必须在每个线程上调用pthread_join。 有没有一种快速而又肮脏的方式
问题内容: 我使用来执行任务。该任务可以递归创建提交给同一任务的其他任务,那些子任务也可以做到这一点。 我现在遇到的问题是,我要等到所有任务都完成(即所有任务都已完成并且它们没有提交新任务)后再继续。 我无法在主线程中调用,因为这会阻止接受新任务。 如果没有被呼叫,呼叫似乎无能为力。 所以我有点卡在这里。看到所有工人都闲着不难,不是吗?我能想到的唯一优雅的解决方案是直接使用a 并偶尔查询一次。真的