刚刚开始学习多线程。我在多个线程中有5个生产者和2个消费者。基本上,该程序将100个项目添加到队列中。当队列大小为100时,生产者将停止添加。我希望消费者在消费者从队列中删除所有项目时通知生产者,以便生产者可以再次开始添加。当前,生产者将等待,但永远不会收到消费者的通知。
制片人:
public class Producer implements Runnable {
private BlockingQueue sharedQueue;
private final int queueSize;
private Object lock = new Object();
public Producer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}
public void run() {
while(true) {
if(sharedQueue.size()== queueSize){
try {
synchronized (lock) {
sharedQueue.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
sharedQueue.put("Producer: " + sharedQueue.size());
Thread.sleep(500);
System.out.println("Producer: Queue Size " + sharedQueue.size() + " Current Thread " + Thread.currentThread());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者:
public class Consumer implements Runnable{
private BlockingQueue sharedQueue;
private final int queueSize;
private final int queueEmpty=0;
private Object lock = new Object();
public Consumer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}
//Notify awaiting thread if the sharedQueue is empty
public void run() {
while (true) {
if(sharedQueue.size()==queueEmpty){
synchronized (lock) {
this.notifyAll();
}
}
try {
sharedQueue.take();
Thread.sleep(800);
System.out.println("Consumer: Queue Size " + sharedQueue.size() + " Current Thread" + Thread.currentThread());
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
主班
public class App{
//A simple program to illustrate how producer and consumer pattern works with blocking queue using executor service
public static void main( String[] args )
{
final BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<String> (100);
final int queueSize =100;
final int producerNum = 5;
final int consumerNum = 2;
final ExecutorService executorProducer = Executors.newFixedThreadPool(producerNum);
final ExecutorService executorConsumer = Executors.newFixedThreadPool(consumerNum);
for(int i=0;i<producerNum;i++){
Producer producer = new Producer(sharedQueue,queueSize);
executorProducer.execute(producer);
}
for(int j=0;j<consumerNum;j++){
Consumer consumer = new Consumer(sharedQueue,queueSize);
executorConsumer.execute(consumer);
}
}
}
从oracle文档页面:
BlockingQueue实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制原子地实现其效果
由于您已经在使用BlockingQueues
,因此可以删除wait()
和notify()
API。
使用的多个生产者和消费者的示例代码BlockingQueue
:
import java.util.concurrent.*;
public class ProducerConsumerDemo {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
Thread consThread2 = new Thread(new Consumer(sharedQueue,2));
prodThread1.start();
prodThread2.start();
consThread1.start();
consThread2.start();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
它是如何工作的?
BlockingQueue
BlockingQueue
BlockingQueue
(此示例中为Integer)读取值样本输出:
Produced:21:by thread:2
Produced:11:by thread:1
Produced:12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 21:by thread:1
Consumed: 12:by thread:1
Consumed: 13:by thread:1
Consumed: 14:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:15:by thread:1
Consumed: 11:by thread:2
Consumed: 15:by thread:1
我有一个生产者-消费者模式的多线程任务。可能有许多生产者和一个消费者。我使用ArrayBlockingQueue作为共享资源。 Producer类中的run()方法: Consumer类中的run()方法: main()方法: 现在,当队列为空时,我有消费者结束条件。但是可能会有一段时间队列变成空的,但是一些生产者线程仍然在工作。所以我只需要在完成所有生产者线程之后才完成消费者线程(但它们的数量事
下面是代码,我面临的问题是recordRead变量告诉线程应该从哪里开始读取记录的起点。但是我如何为每个线程设置不同的值?例如,对于thread1,它应该是0,recordsToRead应该是300,对于thread2,recordsToRead应该是300+300=600,对于最后一个线程,它应该是600以及更高的结束。pagesize=50pagesize、recordRead和recordT
问题内容: 我对于如何使用特定的生产者-消费者模式感到困惑,在该模式中,生产者和消费者都可以同时并独立地进行操作。 首先,考虑以下示例,该示例紧随docs中的示例: 关于此脚本,有一个更详细的细节:通过常规的for循环将项目同步放入队列。 我的目标是创建一个使用(或)和的脚本。两者都应安排为同时运行。没有一个消费者协程明确地与生产者绑定或链接。 我如何修改上面的程序,以便生产者是可以与消费者/工人
我应该在两个子进程中获得信号量ID…还是缺少其他东西。
问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别