当前位置: 首页 > 面试题库 >

关于使用阻塞队列方法的Java生产者和消费者模式

吕文林
2023-03-14
问题内容

我正在研究有关Java中线程的生产者和消费者设计模式,最近我在Java 5中进行了探索,引入Java
5中引入了BlockingQueue数据结构。现在,它变得更加简单,因为BlockingQueue通过引入阻塞方法隐式地提供了此控件。
put()和take()。现在,您无需使用等待和通知即可在生产者和消费者之间进行通信。如果有界队列,则如果Queue已满,BlockingQueue
put()方法将阻塞;如果Queue为空,则take()方法将阻塞。在下一节中,我们将看到Producer
Consumer设计模式的代码示例。我已经开发了以下程序,但也请让我知道waut()和notify()的旧方法,我也想使用旧方法开发相同的逻辑

民间请告知如何实现,经典方式是使用wait()和notify()方法在Producer和Consumer线程之间进行通信,并在诸如完整队列和空队列之类的个别条件下阻止它们中的每一个…?

    import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){

     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();

     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }

}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }


}

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9

问题答案:

我可以在睡觉时(或至少我认为可以)进行等待通知。Java
1.4源提供了所有这些的漂亮示例,但是他们已经切换到使用原子来完成所有事情,并且现在变得更加复杂。尽管其他方法可以使您免受并发危险的困扰,并且可以简化代码,但wait-
notify确实提供了灵活性和强大的功能。

为此,您需要一些字段,例如:

private final ConcurrentLinkedQueue<Intger>  sharedQueue =
                                                    new ConcurrentLinkedQueue<>();
private volatile   boolean  waitFlag = true;

您的Producer.run看起来像这样:

public void run()  {
    for (int i = 0; i < 100000, i++)  {
        System.out.println( "Produced: " + i );
        sharedQueue.add( new Integer( i ) );
        if (waitFlag)       // volatile access is cheaper than synch.
            synchronized (sharedQueue)  { sharedQueue.notifyAll(); }
    }
}

和Consumer.run:

public void run()  {
    waitFlag = false;
    for (;;)  {
        Integer  ic = sharedQueue.poll();
        if (ic == null)  {
            synchronized (sharedQueue)  {
                waitFlag = true;
                // An add might have come through before waitFlag was set.
                ic = sharedQueue.poll();
                if (ic == null)  {
                    try  { sharedQueue.wait(); }
                    catch (InterruptedException ex)  {}
                    waitFlag = false;
                    continue;
                }
                waitFlag = true;
            }
        }
        System.out.println( "Consumed: " + ic );
    }
}

这样可以使同步保持在最低水平。如果一切顺利,那么每次添加时只有一个不稳定的字段可以看一看。您应该能够同时运行任意数量的生产者。(消费者会比较棘手-
waitFlag您必须放弃。)您可以将另一个对象用于wait / notifyAll。



 类似资料:
  • 我试图用阻塞队列实现一些消费者-生产者问题。为了达到某种目的,我决定编写文件搜索工具。 我认为搜索机制是递归工作的,每个新目录都将有新的线程池来提高搜索速度。 我的问题是,我不知道如何实现最终停止打印线程(消费者)的机制——当搜索线程完成工作时。 我试图用一些想法来做到这一点,比如毒丸,但它效果不佳(线程在打印任何结果之前停止)。任何想法我该怎么做? 下面是一些代码: 搜索机制: } 打印机: }

  • 问题内容: 我正在为标准Java系统工作,对生产者来说,这有严格的时序要求(1/100秒的毫秒)。 我有一个生产者将内容放置在阻塞队列中,然后一个消费者使用了该内容并将其转储到文件中。当数据不可用时,使用者将阻塞。 显然,阻塞队列是合适的接口,但是如果我想 最小化生产者的成本, 我应该选择哪种实际实现?当我将内容放入队列时,我希望尽可能少地进行诸如锁定和分配之类的事情,而且我不介意消费者是否需要等

  • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统