我正在尝试编写一个解决方案,其中单个线程产生可以并行执行的I/O密集型任务。每个任务都有重要的内存数据。所以我希望能够限制一次挂起的任务数量。
如果我像这样创建ThreadPoolExecator:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueue));
然后是执行器。submit(可调用)在队列已满且所有线程都已忙时抛出RejectedExecutionException。
我该怎么做才能成为执行者。提交(可调用)在队列已满且所有线程都忙时阻止?
编辑:我尝试了以下操作:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
它在某种程度上实现了我想要实现的效果,但以一种不优雅的方式(基本上被拒绝的线程在调用线程中运行,因此这会阻止调用线程提交更多)。
编辑:(提问后5年)
对于阅读此问题及其答案的任何人,请不要将已接受的答案视为一个正确答案。请通读所有答案和评论。
当前接受的答案有一个潜在的重大问题-它改变了ThreadPoolExecutor的行为。执行时,如果您有一个corePoolSize
来自ThreadPoolExecutor。执行(可运行):
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
具体来说,最后的“else”块永远不会被击中。
一个更好的替代方案是做一些类似于OP已经在做的事情-使用Reject tedExecutionHandler执行相同的put
逻辑:
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
if (!executor.isShutdown()) {
executor.getQueue().put(r);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
}
}
正如评论中所指出的(参考此答案),这种方法需要注意一些事情:
记住这些问题,此解决方案将适用于大多数典型的ThreadPoolExecutors,并将正确处理corePoolSize
这是我如何解决这个问题的:
(注意:此解决方案会阻止提交可调用的线程,因此可以防止抛出RejectedExecutionException)
public class BoundedExecutor extends ThreadPoolExecutor{
private final Semaphore semaphore;
public BoundedExecutor(int bound) {
super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
semaphore = new Semaphore(bound);
}
/**Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{
semaphore.acquire();
return submit(task);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
semaphore.release();
}
}
我也做过同样的事。诀窍是创建一个BlockingQueue,其中offer()方法实际上是put()。(您可以使用所需的任何基本BlockingQueue impl)。
public class LimitedQueue<E> extends LinkedBlockingQueue<E>
{
public LimitedQueue(int maxSize)
{
super(maxSize);
}
@Override
public boolean offer(E e)
{
// turn offer() and add() into a blocking calls (unless interrupted)
try {
put(e);
return true;
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
}
注意,这只适用于corePoolSize==maxPoolSize的线程池,所以要小心(参见注释)。
问题内容: 我正在尝试编写一个解决方案,其中单个线程会生成可并行执行的I / O密集型任务。每个任务都有重要的内存数据。因此,我希望能够限制当前待处理的任务数。 如果我这样创建ThreadPoolExecutor: 然后,当队列已满并且所有线程都已经繁忙时,抛出该异常。 当队列已满并且所有线程都忙时,我该怎么做才能阻塞? 编辑 :我试过了: 它以某种微妙的方式达到了我想要达到的效果(基本上被拒绝的
问题内容: 我搜索了一堆页面,但是找不到我的问题,所以我不得不发表一个帖子。 我有一个带有提交按钮的表单,提交后我不希望它刷新或重定向。我只希望jQuery执行功能。 形式如下: 这是jQuery: 问题答案: 只需处理Submit事件中的表单提交,然后返回false: 您不再需要提交按钮上的onclick事件:
我不知所措。我有一个阻塞请求 此代码通常会阻塞<code>队列。takeFirst()当队列中没有项目时。但是,一旦我按预期添加了项目,它就不会解除阻止。调试时,我可以看到中的项目,并且当我停止Tomcat时,我会序列化。启动后,我将对队列进行反序列化,并在那一点上对<code>队列进行反串行化。takeFirst()检索项目(与之前未检索的相同)并提交它。 有人有什么想法吗? 编辑 再强调一下我
假设,我有一个服务A,它有一个线程池执行器来调用服务B。我们可以用自己的值设置该池的核心池大小和队列。现在,服务B对请求的响应速度很慢,因为服务A线程池中的活动线程增加,导致阻塞队列大小增加。如何防止服务A的队列大小增加? 背景
我想要实现的是,我有一个来自UI的REST调用,它调用添加一个用户。因此,用户必须执行异步队列(这是一个约束),但在将结果发送回UI之前,必须等待响应队列一段配置的时间并对其进行处理。如果队列返回时引用号为空,那么我必须删除用户记录并抛出异常,说明用户无效。如果响应返回时引用有效(或者超时发生),那么我假设它有效并返回success。 我有一个应用程序,我在其中发送一条队列消息,以获取我的用户对象
下面是两种主要的方法。 代码: