我在一个非常简单的生产者-消费者场景中使用 java.util.concurrent.BlockingQueue 。例如,此伪代码描述了使用者部分:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(true)
{
try {
ComplexObject complexObject = myBlockingQueue.take();
//do something with the complex object
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
到目前为止,一切都很好。在阻塞队列的javadoc中,我读到:
BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的流尾对象或有毒对象,这些特殊对象在被消费者拿走时会被相应地解释。
不幸的是,由于使用了泛型以及ComplexObject的性质,将“毒物”推入队列并不是一件容易的事。因此,在我的情况下,这种“通用策略”并不是很方便。
我的问题是:我还可以使用哪些其他好的策略/模式来“关闭”队列?
谢谢!
如果您拥有使用者线程的句柄,则可以中断它。使用您提供的代码,将杀死消费者。我不希望制片人有这个。它可能必须以某种方式回调到程序控制器,以使其知道已完成。然后,控制器将中断使用者线程。
您总是可以在服从中断之前完成工作。例如:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(!(Thread.currentThread().isInterrupted())) {
try {
final ComplexObject complexObject = myBlockingQueue.take();
this.process(complexObject);
} catch (InterruptedException e) {
// Set interrupted flag.
Thread.currentThread().interrupt();
}
}
// Thread is getting ready to die, but first,
// drain remaining elements on the queue and process them.
final LinkedList<ComplexObject> remainingObjects;
myBlockingQueue.drainTo(remainingObjects);
for(ComplexObject complexObject : remainingObjects) {
this.process(complexObject);
}
}
private void process(final ComplexObject complexObject) {
// Do something with the complex object.
}
}
我实际上更愿意以某种方式使队列中毒。如果要杀死线程,请让线程杀死自己。
(很高兴看到有人处理InterruptedException
正确。)
这里似乎对中断的处理有些争议。首先,我希望每个人都阅读这篇文章:http
:
//www.ibm.com/developerworks/java/library/j-jtp05236.html
现在,在了解到没人真正读懂这一点的情况下,这就是交易。如果线程InterruptedException
在中断时当前正在阻塞,则该线程仅会接收到。在这种情况下,Thread.interrupted()
将返回false
。如果没有阻塞,它将不会收到此异常,而是Thread.interrupted()
将返回true
。因此,无论如何,循环防护绝对应检查Thread.interrupted()
或以其他方式冒失线程中断的风险。
因此,由于Thread.interrupted()
无论如何都在进行检查,并且被迫捕获InterruptedException
(即使没有被强制处理,也应该对其进行处理),因此现在有两个处理同一事件的代码区域,即线程中断。解决此问题的一种方法是将它们规范化为一种条件,这意味着布尔状态检查可以引发异常,或者异常可以设置布尔状态。我选择稍后。
编辑: 请注意,静态Thread#interrupted方法将 清除 当前线程的中断状态。
我编写了一个简单的类,我计划将其扩展为客户端套接字编程应用程序的一部分。类涉及一个BlockingQueue(我从这里复制了代码:相当于Java的BlockingQueue的C++)。当我创建了下面的包装类的一个实例后,我打算让它生成一个单独的线程,该线程只需执行BlockingQueue上阻塞的printer()函数,直到有一个或多个字符串可用,然后它只需将字符串打印到控制台窗口。在我的预期应用
我有一个应用程序,在其中按下开始按钮后,服务将开始轮询几个传感器,每当传感器值发生变化时,将传感器数据存储到某个对象中。每10毫秒,就会发生一次数据库插入,获取对象的当前值并将其存储到数据库中。这会发生30分钟 考虑到插入的速度和持续时间,我想在一个独立于UI线程的线程中运行它,这样导航就不会受到影响。因此,我的服务将通过将数据添加到队列中来为线程提供一些数据,然后另一个线程(消费者)将从队列中取
blpop key1...keyN timeout 从左到右扫描返回对第一个非空list进行lpop操作并返回,比如blpop list1 list2 list3 0 ,如果list不存在list2,list3都是非空则对list2做lpop并返回从list2中删除的元素。如果所有的list都是空或不存在,则会阻塞timeout秒,timeout为0表示一直阻塞。当阻塞时,如果有client对ke
下面是两种主要的方法。 代码:
2)Java的内置使用了两个锁:takeLock和putLock,并分别用在put()和take()中,我看到间隔队列是一个链表,不是线程安全的,那怎么行呢?
目前我们有LinkedBlockingQueue和Con的LinkedQueue。 LinkedBlockingQueue可以有界,但它使用锁。 ConcurrentLinkedQueue不使用锁,但它不受限制。而这并不是阻碍投票的原因。 显然,我不能有一个既阻塞又无锁的队列(无等待或非阻塞或其他东西)。我不要求学术定义。 有人知道一个队列实现,它基本上是无锁的(不在热路径中使用锁),空时阻塞(不