当前位置: 首页 > 知识库问答 >
问题:

具有唯一项和线程池的线程安全FIFO队列

林俭
2023-03-14

我必须在一个系统中管理计划的文件复制。文件复制是由用户安排的,我需要限制复制期间使用的系统资源数量。没有定义每次复制可能需要的时间(即,可能计划每15分钟运行一次复制,并且在下一次运行到期时,上一次运行可能仍在运行)。

我有一个调度器,它定期检查到期的文件复制,对于每个文件复制,(1)如果它没有排队也没有运行,就将它添加到阻塞队列中;(2)否则就删除它。

private final Object scheduledReplicationsLock = new Object();
private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();
private final Set<Long> queuedReplicationIds = new HashSet<>();
private final Set<Long> runningReplicationIds = new HashSet<>();

public boolean add(Replication replication) {

    synchronized (scheduledReplicationsLock) {
        // If the replication job is either still executing or is already queued, do not add it.
        if (queuedReplicationIds.contains(replication.id) || runningReplicationIds.contains(replication.id)) {
            return false;
        }
        replicationQueue.add(replication)
        queuedReplicationIds.add(replication.id);
    }

我还有一个线程池,等待直到队列中有复制并执行它。下面是线程池中每个线程的主要方法:

public void run() {
    while (True) {
        Replication replication = null;
        synchronized (scheduledReplicationsLock) {
            // This will block until a replication job is ready to be run or the current thread is interrupted.
            replication = replicationQueue.take();

            // Move the ID value out of the queued set and into the active set
            Long replicationId = replication.getId();
            queuedReplicationIds.remove(replicationId);
            runningReplicationIds.add(replicationId);
        }
        executeReplication(replication)
    }
} 

共有1个答案

曾奇略
2023-03-14

保持相同的控制流,而不是在持有互斥锁的同时阻塞BlockingQueue实例,您可以Wait等待ScheduleDreplicationsLock的通知,强制工作线程释放锁并返回到等待池。

下面是您的生产者的缩小样本:

private final List<Replication> replicationQueue = new LinkedList<>();
private final Set<Long> runningReplicationIds = new HashSet<>();

public boolean add(Replication replication) {
    synchronized (replicationQueue) {
        // If the replication job is either still executing or is already queued, do not add it.
        if (replicationQueue.contains(replication) || runningReplicationIds.contains(replication.id)) {
            return false;
        } else {
            replicationQueue.add(replication);
            replicationQueue.notifyAll();
        }
    }
}

工作线程runnable将按以下方式更新:

public void run() {
    synchronized (replicationQueue) {
        while (true) {
            if (replicationQueue.isEmpty()) {
                scheduledReplicationsLock.wait();
            }
            if (!replicationQueue.isEmpty()) {
                Replication replication = replicationQueue.poll();
                runningReplicationIds.add(replication.getId())
                executeReplication(replication);
            }
        }
    }
} 

通常,您最好使用blockingqueue来协调您的生产者和复制工作者池。

顾名思义,blockingqueue本质上是阻塞,只有当无法从队列中拉出/推入项时,调用线程才会阻塞。

同时,请注意,您必须更新正在运行的/已进入队列的状态管理,因为您将只同步blockingqueue项,删除任何约束。这将取决于上下文,这是否可以接受。

private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();

public boolean add(Replication replication) {
    // not sure if this is the proper invariant to check as at some point the replication would be neither queued nor running while still have been processed
    if (replicationQueue.contains(replication)) {
        return false;
    }
    // use `put` instead of `add` as this will block waiting for free space
    replicationQueue.put(replication);
    return true;
}
public void run() {
    while (true) {
        Replication replication = replicationQueue.take();
        executeReplication(replication);
    }
} 
 类似资料:
  • 问题内容: 我已经看到了线程池执行程序的实现及其所提供的拒绝执行策略。但是,我有一个自定义要求- 我想拥有一个回调机制,在该机制中,当达到队列大小限制时,我会收到通知,并说何时队列大小减少到最大允许队列大小的80%。 我觉得可以通过子类化线程池执行程序来实现,但是已经有一个实现的版本吗?我很乐意在需要时提供更多详细信息和我的工作,以便提供清晰的信息。 问题答案: 我希望有一个回调机制,当达到队列大

  • 问题内容: 我正在使用的类创建用于运行Web服务器的请求处理程序的固定线程池: 并且说明是: 创建一个线程池,该线程池重用在共享的 无边界 队列上运行的一组固定线程。 但是,我正在寻找实现与缓冲池完全相同的线程池实现,除了使用有 界 队列。有这样的实现吗?还是我需要为固定线程池实现自己的包装器? 问题答案: 您想要做的是新建自己的ExecutorService,可能使用ThreadPoolExec

  • 我正计划创建可调整队列大小的可调整线程池。我正在使用unbounded LinkedBlockingQueue和一个外部设置,该设置控制排队的消息数量。最初,my corepoolsize和maxpoolsize是相等的。现在,如果我想在运行时更新我的线程池大小,我通过一个公共设置将corepoolsize和maxpoolsize设置为不同的值。我想知道你对这种做法有什么看法。 当maxpools

  • 主要内容:一、MySql中的线程,二、主要方式,三、源码流程,四、总结一、MySql中的线程 在mysql中,每一个连接上来,就会分配给一个相关的THD数据类。在前面的分析中可以看到,连接器(Connectors)连接到的直接就是连接池,在连接池的线程处理中分为三部分,即一对一(一个连接对应一个线程),多对一(多个连接对应一个线程)和线程池(多对多)。 线程池和线程可以针对不同的具体场景来处理具体的事务,这样既兼顾了效率又提高了适应性,对于新手来说,这就是设计的一个

  • 基础的FIFO队列 # queue_fifo.py import queue q = queue.Queue() for i in range(5): q.put(i) while not q.empty(): print(q.get(), end=' ') print() LIFO队列 # queue_lifo.py import queue q = queue.Lif

  • 在使用TensorFlow进行异步计算时,队列是一种强大的机制。 正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队列后端(rear),也可以把队列前端(front)的元素删除。 为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQ