当前位置: 首页 > 知识库问答 >
问题:

自定义阵列阻塞队列

戴鸿羲
2023-03-14

下面是两种主要的方法。

public synchronized void put(Integer i) throws InterruptedException {

    if (a.size() == capacity) {
        wait();
    }
    a.add(i);
    notifyAll();
}

public synchronized void take() throws InterruptedException {

    if (a.isEmpty()) {
        wait();
    }
    a.remove(0);
    notifyAll();
}

代码

public class App {

    public static MyBlockingQueue q = new MyBlockingQueue(10);

    // public static ArrayBlockingQueue q = new ArrayBlockingQueue(10);

    public void method1() throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            q.put(i);
            System.out.println(q);
        }
    }

    public void method2() throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            q.take();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        App a = new App();

        ExecutorService executor1 = Executors.newFixedThreadPool(20);
        ExecutorService executor2 = Executors.newFixedThreadPool(20);

        for (int i = 0; i < 2; i++) {

            executor1.submit(new Runnable() {

                public void run() {

                    try {
                        a.method1();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }

        for (int i = 0; i < 2; i++) {

            executor2.submit(new Runnable() {

                public void run() {

                    try {
                        a.method2();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }

        executor1.shutdown();
        executor2.shutdown();

        executor1.awaitTermination(1, TimeUnit.DAYS);
        executor2.awaitTermination(2, TimeUnit.DAYS);

        System.out.println();
        System.out.println("The final queue is:");
        System.out.println(App.q);

    }

}

class MyBlockingQueue {

private ArrayList<Integer> a;
    private int capacity;

    public MyBlockingQueue(Integer cap) {
        capacity = cap;
        a = new ArrayList<Integer>(capacity);
    }

    @Override
    public String toString() {
        String output = "";
        for (Integer i : a) {
            output += i.toString() + " ";
        }
        return "[" + output + "]";
    }

    public synchronized void put(Integer i) throws InterruptedException {

        if (a.size() == capacity) {
            wait();
        }
        a.add(i);
        notifyAll();
    }

    public synchronized void take() throws InterruptedException {

        if (a.isEmpty()) {
            wait();
        }
        a.remove(0);
        notifyAll();
    }
}

共有1个答案

刘元青
2023-03-14

问题是所有线程都在同一个监视器上等待,并且会被notifyAll唤醒并同时运行。

您可以使用notify方法唤醒单个线程并进行单个插入或删除。但是在put()内部调用的notify()可以唤醒等待在put()方法中的not full条件下插入的线程:

while (a.size() == capacity) {
        wait(); // many threads waiting
    }

take()内调用的notify()可以唤醒等待not empty条件的线程:

while (a.isEmpty()) {
        wait();
    }

因为所有线程都使用一个监视器,notify可以唤醒任何等待的线程。

因此,您需要两个监视器:一个用于not full条件,一个用于not empty

    Object notFull = new Object();
    Object notEmpty = new Object();

    public synchronized void put(Integer i) throws InterruptedException {

        while (a.size() == capacity) {
            notFull.wait(); 
        }
        a.add(i);
        notEmpty.notify(); //wake up one random thread in take() method
    }

    public synchronized void take() throws InterruptedException {

        if (a.isEmpty()) {
            notEmpty.wait();
        }
        a.remove(0);
        notFull.notify(); // wake up one random thread in put() method
    }

现在,notempty.notify()notfull.notify()不释放由puttake方法中的synchronized关键字获取的锁。

我们需要在同一个锁上同步两个方法,并基于两个条件释放或获取它:未满和未空。为此,有java.util.concurrent.locks.ReentrantLock类:

一种可重入的互斥锁,其基本行为和语义与使用同步方法和语句访问的隐式监视器锁相同,但具有扩展功能

该类表示带条件的锁。其方法[newCondition][2]创建条件:

返回与此锁实例一起使用的条件实例。

条件允许通过await()方法挂起多个线程,通过signal()方法唤醒单个等待线程。没有ReentrantLock就无法使用。调用condition.await时,线程释放在ReentrantLock.lock方法中获取的锁。调用condition.signal时,等待线程获取ReentrantLock。最终代码:

/** Main lock guarding all access */
       private final ReentrantLock lock;
       /** Condition for waiting takes */
       private final Condition notEmpty;
       /** Condition for waiting puts */
       private final Condition notFull;

 public void put(Integer i) throws InterruptedException {
          lock.lockInterruptibly();
          try {
                  while (a.size() == capacity)
                      notFull.await();//realease monitor and sleep until notFull.signal is called

             a.add(i);
            notEmpty.signal();// wake up one random thread in take() method
          } finally {
              lock.unlock();
          }
 }

public void take() throws InterruptedException {
          lock.lockInterruptibly();
          try {
                   while (a.isEmpty())
                      notEmpty.await();//realease monitor and sleep until notEmpty.signal is called

                   a.remove(0);
             notFull.signal();// wake up one random thread in put() method
          } finally {
              lock.unlock();
          }       
}

ReentrantLock确保只有一个线程可以同时执行PutTake方法。condition允许根据条件挂起和恢复线程。

 类似资料:
  • 问题内容: 我在一个非常简单的生产者-消费者场景中使用 java.util.concurrent.BlockingQueue 。例如,此伪代码描述了使用者部分: 到目前为止,一切都很好。在阻塞队列的javadoc中,我读到: BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的

  • blpop key1...keyN timeout 从左到右扫描返回对第一个非空list进行lpop操作并返回,比如blpop list1 list2 list3 0 ,如果list不存在list2,list3都是非空则对list2做lpop并返回从list2中删除的元素。如果所有的list都是空或不存在,则会阻塞timeout秒,timeout为0表示一直阻塞。当阻塞时,如果有client对ke

  • 我编写了一个简单的类,我计划将其扩展为客户端套接字编程应用程序的一部分。类涉及一个BlockingQueue(我从这里复制了代码:相当于Java的BlockingQueue的C++)。当我创建了下面的包装类的一个实例后,我打算让它生成一个单独的线程,该线程只需执行BlockingQueue上阻塞的printer()函数,直到有一个或多个字符串可用,然后它只需将字符串打印到控制台窗口。在我的预期应用

  • 我有一个应用程序,在其中按下开始按钮后,服务将开始轮询几个传感器,每当传感器值发生变化时,将传感器数据存储到某个对象中。每10毫秒,就会发生一次数据库插入,获取对象的当前值并将其存储到数据库中。这会发生30分钟 考虑到插入的速度和持续时间,我想在一个独立于UI线程的线程中运行它,这样导航就不会受到影响。因此,我的服务将通过将数据添加到队列中来为线程提供一些数据,然后另一个线程(消费者)将从队列中取

  • 2)Java的内置使用了两个锁:takeLock和putLock,并分别用在put()和take()中,我看到间隔队列是一个链表,不是线程安全的,那怎么行呢?

  • 目前我们有LinkedBlockingQueue和Con的LinkedQueue。 LinkedBlockingQueue可以有界,但它使用锁。 ConcurrentLinkedQueue不使用锁,但它不受限制。而这并不是阻碍投票的原因。 显然,我不能有一个既阻塞又无锁的队列(无等待或非阻塞或其他东西)。我不要求学术定义。 有人知道一个队列实现,它基本上是无锁的(不在热路径中使用锁),空时阻塞(不