当前位置: 首页 > 面试题库 >

“关闭”阻塞队列

袁单鹗
2023-03-14
问题内容

我在一个非常简单的生产者-消费者场景中使用 java.util.concurrent.BlockingQueue 。例如,此伪代码描述了使用者部分:

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

到目前为止,一切都很好。在阻塞队列的javadoc中,我读到:

BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的流尾对象或有毒对象,这些特殊对象在被消费者拿走时会被相应地解释。

不幸的是,由于使用了泛型以及ComplexObject的性质,将“毒物”推入队列并不是一件容易的事。因此,在我的情况下,这种“通用策略”并不是很方便。

我的问题是:我还可以使用哪些其他好的策略/模式来“关闭”队列?

谢谢!


问题答案:

如果您拥有使用者线程的句柄,则可以中断它。使用您提供的代码,将杀死消费者。我不希望制片人有这个。它可能必须以某种方式回调到程序控制器,以使其知道已完成。然后,控制器将中断使用者线程。

您总是可以在服从中断之前完成工作。例如:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

我实际上更愿意以某种方式使队列中毒。如果要杀死线程,请让线程杀死自己。

(很高兴看到有人处理InterruptedException正确。)

这里似乎对中断的处理有些争议。首先,我希望每个人都阅读这篇文章:http
:
//www.ibm.com/developerworks/java/library/j-jtp05236.html

现在,在了解到没人真正读懂这一点的情况下,这就是交易。如果线程InterruptedException在中断时当前正在阻塞,则该线程仅会接收到。在这种情况下,Thread.interrupted()返回false。如果没有阻塞,它将不会收到此异常,而是Thread.interrupted()将返回true。因此,无论如何,循环防护绝对应检查Thread.interrupted()或以其他方式冒失线程中断的风险。

因此,由于Thread.interrupted()无论如何都在进行检查,并且被迫捕获InterruptedException(即使没有被强制处理,也应该对其进行处理),因此现在有两个处理同一事件的代码区域,即线程中断。解决此问题的一种方法是将它们规范化为一种条件,这意味着布尔状态检查可以引发异常,或者异常可以设置布尔状态。我选择稍后。

编辑: 请注意,静态Thread#interrupted方法清除 当前线程的中断状态。



 类似资料:
  • 我编写了一个简单的类,我计划将其扩展为客户端套接字编程应用程序的一部分。类涉及一个BlockingQueue(我从这里复制了代码:相当于Java的BlockingQueue的C++)。当我创建了下面的包装类的一个实例后,我打算让它生成一个单独的线程,该线程只需执行BlockingQueue上阻塞的printer()函数,直到有一个或多个字符串可用,然后它只需将字符串打印到控制台窗口。在我的预期应用

  • 我有一个应用程序,在其中按下开始按钮后,服务将开始轮询几个传感器,每当传感器值发生变化时,将传感器数据存储到某个对象中。每10毫秒,就会发生一次数据库插入,获取对象的当前值并将其存储到数据库中。这会发生30分钟 考虑到插入的速度和持续时间,我想在一个独立于UI线程的线程中运行它,这样导航就不会受到影响。因此,我的服务将通过将数据添加到队列中来为线程提供一些数据,然后另一个线程(消费者)将从队列中取

  • blpop key1...keyN timeout 从左到右扫描返回对第一个非空list进行lpop操作并返回,比如blpop list1 list2 list3 0 ,如果list不存在list2,list3都是非空则对list2做lpop并返回从list2中删除的元素。如果所有的list都是空或不存在,则会阻塞timeout秒,timeout为0表示一直阻塞。当阻塞时,如果有client对ke

  • 下面是两种主要的方法。 代码:

  • 2)Java的内置使用了两个锁:takeLock和putLock,并分别用在put()和take()中,我看到间隔队列是一个链表,不是线程安全的,那怎么行呢?

  • 目前我们有LinkedBlockingQueue和Con的LinkedQueue。 LinkedBlockingQueue可以有界,但它使用锁。 ConcurrentLinkedQueue不使用锁,但它不受限制。而这并不是阻碍投票的原因。 显然,我不能有一个既阻塞又无锁的队列(无等待或非阻塞或其他东西)。我不要求学术定义。 有人知道一个队列实现,它基本上是无锁的(不在热路径中使用锁),空时阻塞(不