我必须在一个系统中管理计划的文件复制。文件复制是由用户安排的,我需要限制复制期间使用的系统资源数量。没有定义每次复制可能需要的时间(即,可能计划每15分钟运行一次复制,并且在下一次运行到期时,上一次运行可能仍在运行)。
我有一个调度器,它定期检查到期的文件复制,对于每个文件复制,(1)如果它没有排队也没有运行,就将它添加到阻塞队列中;(2)否则就删除它。
private final Object scheduledReplicationsLock = new Object();
private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();
private final Set<Long> queuedReplicationIds = new HashSet<>();
private final Set<Long> runningReplicationIds = new HashSet<>();
public boolean add(Replication replication) {
synchronized (scheduledReplicationsLock) {
// If the replication job is either still executing or is already queued, do not add it.
if (queuedReplicationIds.contains(replication.id) || runningReplicationIds.contains(replication.id)) {
return false;
}
replicationQueue.add(replication)
queuedReplicationIds.add(replication.id);
}
我还有一个线程池,等待直到队列中有复制并执行它。下面是线程池中每个线程的主要方法:
public void run() {
while (True) {
Replication replication = null;
synchronized (scheduledReplicationsLock) {
// This will block until a replication job is ready to be run or the current thread is interrupted.
replication = replicationQueue.take();
// Move the ID value out of the queued set and into the active set
Long replicationId = replication.getId();
queuedReplicationIds.remove(replicationId);
runningReplicationIds.add(replicationId);
}
executeReplication(replication)
}
}
保持相同的控制流,而不是在持有互斥锁的同时阻塞BlockingQueue
实例,您可以Wait
等待ScheduleDreplicationsLock
的通知,强制工作线程释放锁并返回到等待池。
下面是您的生产者的缩小样本:
private final List<Replication> replicationQueue = new LinkedList<>();
private final Set<Long> runningReplicationIds = new HashSet<>();
public boolean add(Replication replication) {
synchronized (replicationQueue) {
// If the replication job is either still executing or is already queued, do not add it.
if (replicationQueue.contains(replication) || runningReplicationIds.contains(replication.id)) {
return false;
} else {
replicationQueue.add(replication);
replicationQueue.notifyAll();
}
}
}
工作线程runnable
将按以下方式更新:
public void run() {
synchronized (replicationQueue) {
while (true) {
if (replicationQueue.isEmpty()) {
scheduledReplicationsLock.wait();
}
if (!replicationQueue.isEmpty()) {
Replication replication = replicationQueue.poll();
runningReplicationIds.add(replication.getId())
executeReplication(replication);
}
}
}
}
通常,您最好使用blockingqueue
来协调您的生产者和复制工作者池。
顾名思义,blockingqueue
本质上是阻塞,只有当无法从队列中拉出/推入项时,调用线程才会阻塞。
同时,请注意,您必须更新正在运行的/已进入队列的状态管理,因为您将只同步blockingqueue
项,删除任何约束。这将取决于上下文,这是否可以接受。
private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();
public boolean add(Replication replication) {
// not sure if this is the proper invariant to check as at some point the replication would be neither queued nor running while still have been processed
if (replicationQueue.contains(replication)) {
return false;
}
// use `put` instead of `add` as this will block waiting for free space
replicationQueue.put(replication);
return true;
}
public void run() {
while (true) {
Replication replication = replicationQueue.take();
executeReplication(replication);
}
}
问题内容: 我已经看到了线程池执行程序的实现及其所提供的拒绝执行策略。但是,我有一个自定义要求- 我想拥有一个回调机制,在该机制中,当达到队列大小限制时,我会收到通知,并说何时队列大小减少到最大允许队列大小的80%。 我觉得可以通过子类化线程池执行程序来实现,但是已经有一个实现的版本吗?我很乐意在需要时提供更多详细信息和我的工作,以便提供清晰的信息。 问题答案: 我希望有一个回调机制,当达到队列大
问题内容: 我正在使用的类创建用于运行Web服务器的请求处理程序的固定线程池: 并且说明是: 创建一个线程池,该线程池重用在共享的 无边界 队列上运行的一组固定线程。 但是,我正在寻找实现与缓冲池完全相同的线程池实现,除了使用有 界 队列。有这样的实现吗?还是我需要为固定线程池实现自己的包装器? 问题答案: 您想要做的是新建自己的ExecutorService,可能使用ThreadPoolExec
我正计划创建可调整队列大小的可调整线程池。我正在使用unbounded LinkedBlockingQueue和一个外部设置,该设置控制排队的消息数量。最初,my corepoolsize和maxpoolsize是相等的。现在,如果我想在运行时更新我的线程池大小,我通过一个公共设置将corepoolsize和maxpoolsize设置为不同的值。我想知道你对这种做法有什么看法。 当maxpools
主要内容:一、MySql中的线程,二、主要方式,三、源码流程,四、总结一、MySql中的线程 在mysql中,每一个连接上来,就会分配给一个相关的THD数据类。在前面的分析中可以看到,连接器(Connectors)连接到的直接就是连接池,在连接池的线程处理中分为三部分,即一对一(一个连接对应一个线程),多对一(多个连接对应一个线程)和线程池(多对多)。 线程池和线程可以针对不同的具体场景来处理具体的事务,这样既兼顾了效率又提高了适应性,对于新手来说,这就是设计的一个
基础的FIFO队列 # queue_fifo.py import queue q = queue.Queue() for i in range(5): q.put(i) while not q.empty(): print(q.get(), end=' ') print() LIFO队列 # queue_lifo.py import queue q = queue.Lif
在使用TensorFlow进行异步计算时,队列是一种强大的机制。 正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队列后端(rear),也可以把队列前端(front)的元素删除。 为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQ