下面是两种主要的方法。
public synchronized void put(Integer i) throws InterruptedException {
if (a.size() == capacity) {
wait();
}
a.add(i);
notifyAll();
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
wait();
}
a.remove(0);
notifyAll();
}
代码:
public class App {
public static MyBlockingQueue q = new MyBlockingQueue(10);
// public static ArrayBlockingQueue q = new ArrayBlockingQueue(10);
public void method1() throws InterruptedException {
for (int i = 0; i < 20; i++) {
q.put(i);
System.out.println(q);
}
}
public void method2() throws InterruptedException {
for (int i = 0; i < 20; i++) {
q.take();
}
}
public static void main(String[] args) throws InterruptedException {
App a = new App();
ExecutorService executor1 = Executors.newFixedThreadPool(20);
ExecutorService executor2 = Executors.newFixedThreadPool(20);
for (int i = 0; i < 2; i++) {
executor1.submit(new Runnable() {
public void run() {
try {
a.method1();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
for (int i = 0; i < 2; i++) {
executor2.submit(new Runnable() {
public void run() {
try {
a.method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
executor1.shutdown();
executor2.shutdown();
executor1.awaitTermination(1, TimeUnit.DAYS);
executor2.awaitTermination(2, TimeUnit.DAYS);
System.out.println();
System.out.println("The final queue is:");
System.out.println(App.q);
}
}
class MyBlockingQueue {
private ArrayList<Integer> a;
private int capacity;
public MyBlockingQueue(Integer cap) {
capacity = cap;
a = new ArrayList<Integer>(capacity);
}
@Override
public String toString() {
String output = "";
for (Integer i : a) {
output += i.toString() + " ";
}
return "[" + output + "]";
}
public synchronized void put(Integer i) throws InterruptedException {
if (a.size() == capacity) {
wait();
}
a.add(i);
notifyAll();
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
wait();
}
a.remove(0);
notifyAll();
}
}
问题是所有线程都在同一个监视器上等待,并且会被notifyAll唤醒并同时运行。
您可以使用notify
方法唤醒单个线程并进行单个插入或删除。但是在put()
内部调用的notify()
可以唤醒等待在put()
方法中的not full
条件下插入的线程:
while (a.size() == capacity) {
wait(); // many threads waiting
}
在take()
内调用的notify()
可以唤醒等待not empty
条件的线程:
while (a.isEmpty()) {
wait();
}
因为所有线程都使用一个监视器,notify
可以唤醒任何等待的线程。
因此,您需要两个监视器:一个用于not full
条件,一个用于not empty
。
Object notFull = new Object();
Object notEmpty = new Object();
public synchronized void put(Integer i) throws InterruptedException {
while (a.size() == capacity) {
notFull.wait();
}
a.add(i);
notEmpty.notify(); //wake up one random thread in take() method
}
public synchronized void take() throws InterruptedException {
if (a.isEmpty()) {
notEmpty.wait();
}
a.remove(0);
notFull.notify(); // wake up one random thread in put() method
}
现在,notempty.notify()
和notfull.notify()
不释放由put
或take
方法中的synchronized
关键字获取的锁。
我们需要在同一个锁上同步两个方法,并基于两个条件释放或获取它:未满和未空。为此,有java.util.concurrent.locks.ReentrantLock
类:
一种可重入的互斥锁,其基本行为和语义与使用同步方法和语句访问的隐式监视器锁相同,但具有扩展功能。
该类表示带条件的锁。其方法[newCondition][2]
创建条件:
返回与此锁实例一起使用的条件实例。
条件允许通过await()
方法挂起多个线程,通过signal()
方法唤醒单个等待线程。没有ReentrantLock就无法使用。调用condition.await
时,线程释放在ReentrantLock.lock
方法中获取的锁。调用condition.signal
时,等待线程获取ReentrantLock
。最终代码:
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public void put(Integer i) throws InterruptedException {
lock.lockInterruptibly();
try {
while (a.size() == capacity)
notFull.await();//realease monitor and sleep until notFull.signal is called
a.add(i);
notEmpty.signal();// wake up one random thread in take() method
} finally {
lock.unlock();
}
}
public void take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (a.isEmpty())
notEmpty.await();//realease monitor and sleep until notEmpty.signal is called
a.remove(0);
notFull.signal();// wake up one random thread in put() method
} finally {
lock.unlock();
}
}
ReentrantLock
确保只有一个线程可以同时执行Put
或Take
方法。condition
允许根据条件挂起和恢复线程。
问题内容: 我在一个非常简单的生产者-消费者场景中使用 java.util.concurrent.BlockingQueue 。例如,此伪代码描述了使用者部分: 到目前为止,一切都很好。在阻塞队列的javadoc中,我读到: BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的
blpop key1...keyN timeout 从左到右扫描返回对第一个非空list进行lpop操作并返回,比如blpop list1 list2 list3 0 ,如果list不存在list2,list3都是非空则对list2做lpop并返回从list2中删除的元素。如果所有的list都是空或不存在,则会阻塞timeout秒,timeout为0表示一直阻塞。当阻塞时,如果有client对ke
我编写了一个简单的类,我计划将其扩展为客户端套接字编程应用程序的一部分。类涉及一个BlockingQueue(我从这里复制了代码:相当于Java的BlockingQueue的C++)。当我创建了下面的包装类的一个实例后,我打算让它生成一个单独的线程,该线程只需执行BlockingQueue上阻塞的printer()函数,直到有一个或多个字符串可用,然后它只需将字符串打印到控制台窗口。在我的预期应用
我有一个应用程序,在其中按下开始按钮后,服务将开始轮询几个传感器,每当传感器值发生变化时,将传感器数据存储到某个对象中。每10毫秒,就会发生一次数据库插入,获取对象的当前值并将其存储到数据库中。这会发生30分钟 考虑到插入的速度和持续时间,我想在一个独立于UI线程的线程中运行它,这样导航就不会受到影响。因此,我的服务将通过将数据添加到队列中来为线程提供一些数据,然后另一个线程(消费者)将从队列中取
2)Java的内置使用了两个锁:takeLock和putLock,并分别用在put()和take()中,我看到间隔队列是一个链表,不是线程安全的,那怎么行呢?
目前我们有LinkedBlockingQueue和Con的LinkedQueue。 LinkedBlockingQueue可以有界,但它使用锁。 ConcurrentLinkedQueue不使用锁,但它不受限制。而这并不是阻碍投票的原因。 显然,我不能有一个既阻塞又无锁的队列(无等待或非阻塞或其他东西)。我不要求学术定义。 有人知道一个队列实现,它基本上是无锁的(不在热路径中使用锁),空时阻塞(不