当前位置: 首页 > 面试题库 >

阻塞队列和多线程使用者,如何知道何时停止

杭英杰
2023-03-14
问题内容

我有一个单一的线程生产者,它创建了一些任务对象,然后将它们添加到一个ArrayBlockingQueue(大小固定的)对象中。

我还启动了一个多线程使用者。这是作为固定线程池(Executors.newFixedThreadPool(threadCount);)构建的。然后,我向该threadPool提交一些ConsumerWorker实例,每个ConsumerWorker都引用了上述ArrayBlockingQueue实例

每个这样的Worker将take()在队列中执行并处理任务。

我的问题是,什么时候不再需要其他工作,让工人知道的最佳方法是什么。换句话说,我如何告诉工人生产者已经完成添加到队列,从这一点开始,每个工人在看到队列为空时都应该停止。

我现在得到的是一个设置,其中,生产者使用一个回调初始化,当他完成工作时(将内容添加到队列中),该回调将被触发。我还保留了我创建并提交给ThreadPool的所有ConsumerWorkers的列表。当生产者回叫告诉我生产者完成时,我可以告诉每个工人。在这一点上,他们应该继续检查队列是否不为空,当队列为空时,它们应该停止,从而使我能够正常关闭ExecutorService线程池。是这样的

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是我有一个明显的竞争条件,有时生产者会完成,发信号,而ConsumerWorkers会在消耗队列中的所有内容之前停止。

我的问题是什么是最好的同步方式,以便一切正常?我是否应该同步检查生产者是否正在运行的整个部分,以及队列是否为空,并在一个块中(在队列对象上)从队列中取出内容?我是否应该仅isRunning在ConsumerWorker实例上同步布尔值的更新?还有其他建议吗?

更新,这里是我最终使用的工作实现:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1);

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这主要是受到JohnVint的以下回答的启发,仅作了一些小改动。

===由于@vendhan的评论而进行了更新。

谢谢您的观察。没错,这个问题的第一个代码片段(以及其他问题)while(isRunning || !inputQueue.isEmpty())确实没有什么意义。

在此的最终实际实现中,我所做的事情与您建议替换“ ||”更接近 (或)和“
&&”(和),从某种意义上说,每个工人(消费者)现在仅检查从列表中得到的元素是否是毒药,如果停止,则停止(因此理论上我们可以说该工人有且队列不能为空)。


问题答案:

您应该take()从队列继续。您可以使用毒药告诉工人停下来。例如:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}


 类似资料:
  • 我编写了一个简单的类,我计划将其扩展为客户端套接字编程应用程序的一部分。类涉及一个BlockingQueue(我从这里复制了代码:相当于Java的BlockingQueue的C++)。当我创建了下面的包装类的一个实例后,我打算让它生成一个单独的线程,该线程只需执行BlockingQueue上阻塞的printer()函数,直到有一个或多个字符串可用,然后它只需将字符串打印到控制台窗口。在我的预期应用

  • 1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。 阻塞队列提供了四种处理方法 方法\处

  • 我在我产品环境中发现了一个问题。 我们在一个mq集群中有6个队列,我们有200个线程的线程池(实际上会更多,因为它会在一个独立的线程池中安排一些特殊任务)来处理来自上游的请求,当处理请求时,我会发布一个消息给rabbitmq Broker。 所以我有200个线程将消息发布到这6个队列。 对于每个队列,我将创建一个AMQP连接,对于每个线程,我有一个Channel的threadlocal,这样每个线

  • 问题内容: 我有两个分开的阻塞队列。客户端通常使用第二个阻塞队列中的第一个来检索要处理的元素。 在某些情况下,客户端对两个阻塞队列中的元素感兴趣,无论哪个队列首先提供数据。 客户端如何并行等待两个队列? 问题答案: 您可以尝试在某种循环中使用该方法,以仅在指定时间量内等待一个队列,然后再轮询另一个队列。 除此之外,我会说在另一个线程上为每个队列运行阻塞操作并为您的主应用程序提供回调接口是另一个稍微

  • 我正在编写一个有3个线程的程序。一个读取一个文本文件并将单词输入到大小为2的ArraylistBlockingQueue中。下一个获取该列表并反转其中的每个其他单词。最后一个线程获取单词并将它们写入一个新的文本文件。 我所有的东西都在工作,除了我不知道如何中断和停止我的线程。程序写入文本文件,但从未结束。 主要方法 输入 反向类@重写公共void run(){ 输出代码@覆盖公共无效run(){

  • 我正在使用(java)LinkedBlockingQueue创建资源池,其中 资源元素是等价的,属于一个池,它们的排序是无关紧要的。 消费者是竞争线程,一次抓取一个资源,使用“拉”操作,处理资源,然后通过“添加”操作将其返回给池。 当特定资源被消费者线程使用时,它不得对其他消费者线程可用。 问题是:LinkedBlockingQueue不会对等待的消费者进行FIFO,而且服务水平也不统一。 关于这