我正在阅读blockingqueue、executoreserivce和producer-consumer范式。我希望有一个不断变化的生产者和不断变化的消费者。每个生产者将追加到队列中,消费者将消费消息并处理它们。我有的问题是--生产者怎么知道消费者做了,没有更多的消息会进入队列?我想在主线程中添加一个计数器。当一个生产者启动时,我将增加计数器,当每个生产者结束时,他们将减少int。我的消费者将能够知道计数器,当它达到0,并且队列中没有更多的元素时,他们可以死亡。
在同步工作方面的另一个常见问题-主线程是否应该读取队列的内容,并为每个消息添加执行器,或者让线程知道这个逻辑并自行决定何时结束是最佳实践?
当系统启动时,我收到一个数字,决定有多少生产者将启动。每个生产者将生成一组随机的数字到队列中。消费者将把这些数字打印到日志中。我现在的问题是,一旦我知道最后一个生产商把最后一个数字推进来了,我还是不明白如何让消费者知道不会有更多的数字进来,他们应该关闭。
消费者怎么知道生产者做了什么?
这个问题的一个很好的解决方案是使用PoisonPill模式。这里有一个例子说明它是如何工作的。在这种情况下,您需要知道的只是制作者的数量。
编辑:我更新了代码,以便在最后一个使用者完成工作时清除队列。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class PoisonPillsTests {
interface Message {
}
interface PoisonPill extends Message {
PoisonPill INSTANCE = new PoisonPill() {
};
}
static class TextMessage implements Message {
private final String text;
public TextMessage(String text) {
this.text = text;
}
public String getText() {
return text;
}
@Override
public String toString() {
return text;
}
}
static class Producer implements Runnable {
private final String producerName;
private final AtomicInteger producersCount;
private final BlockingQueue<Message> messageBlockingQueue;
public Producer(String producerName, BlockingQueue<Message> messageBlockingQueue, AtomicInteger producersCount) {
this.producerName = producerName;
this.messageBlockingQueue = messageBlockingQueue;
this.producersCount = producersCount;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
messageBlockingQueue.put(new TextMessage("Producer " + producerName + " message " + i));
}
if (producersCount.decrementAndGet() <= 0) {
//we need this producersCount so that the producers to produce a single poison pill
messageBlockingQueue.put(PoisonPill.INSTANCE);
}
} catch (InterruptedException e) {
throw new RuntimeException("Producer interrupted", e);
}
}
}
static class Consumer implements Runnable {
private final AtomicInteger consumersCount;
private final AtomicInteger consumedMessages;
private final BlockingQueue<Message> messageBlockingQueue;
public Consumer(BlockingQueue<Message> messageBlockingQueue, AtomicInteger consumersCount, AtomicInteger consumedMessages) {
this.messageBlockingQueue = messageBlockingQueue;
this.consumersCount = consumersCount;
this.consumedMessages = consumedMessages;
}
@Override
public void run() {
try {
while (true) {
Message message = null;
message = messageBlockingQueue.take();
if (message instanceof PoisonPill) {
//we put back the poison pill so that to be consumed by the next consumer
messageBlockingQueue.put(message);
break;
} else {
consumedMessages.incrementAndGet();
System.out.println("Consumer got message " + message);
}
}
} catch (InterruptedException e) {
throw new RuntimeException("Consumer interrupted", e);
} finally {
if (consumersCount.decrementAndGet() <= 0) {
System.out.println("Last consumer, clearing the queue");
messageBlockingQueue.clear();
}
}
}
}
public static void main(String[] args) {
final AtomicInteger producerCount = new AtomicInteger(4);
final AtomicInteger consumersCount = new AtomicInteger(2);
final AtomicInteger consumedMessages = new AtomicInteger();
BlockingQueue<Message> messageBlockingQueue = new LinkedBlockingQueue<>();
List<CompletableFuture<Void>> tasks = new ArrayList<>();
for (int i = 0; i < producerCount.get(); i++) {
tasks.add(CompletableFuture.runAsync(new Producer("" + (i + 1), messageBlockingQueue, producerCount)));
}
for (int i = 0; i < consumersCount.get(); i++) {
tasks.add(CompletableFuture.runAsync(new Consumer(messageBlockingQueue, consumersCount, consumedMessages)));
}
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
System.out.println("Consumed " + consumedMessages + " messages");
}
}
问题内容: 我正在寻找一种方法来阻止直到a 为空。 我知道,在多线程环境中,只要有生产者将项目放入,就可能会出现队列变空并且几秒钟后队列中充满项目的情况。 但是,如果只有 一个 生产者,则它可能要等待(并阻止)直到队列停止为空后再将其放入队列。 Java /伪代码: 你有什么主意吗? 编辑 :我知道包装和使用额外的条件可以解决问题,我只是问是否有一些预制的解决方案和/或更好的选择。 问题答案: 使
java.util.concurrent.BlockingQueue接口是Queue接口的子接口,并且还支持诸如在检索元素之前等待队列变为非空的操作,并在存储元素之前等待队列中的空间可用。 BlockingQueue方法 Sr.No. 方法和描述 1 boolean add(E e) 如果可以在不违反容量限制的情况下立即执行此操作,则将指定的元素插入此队列,成功时返回true,如果当前没有可用空间
问题内容: 我正在使用SqlBulkCopy从xml备份还原表。其中一个表备份约为200MB,并且有很多记录。 我有错误: 问题答案: 您可能需要增加超时时间。尝试从默认值30秒增加该值。
问题内容: 从PHP运行长查询时, [如果用户在浏览器中按Stop,我如何杀死查询? 考虑到我不能调用任何其他PHP函数,因为在等待MySQL时PHP被阻止了。 另外,由于会话锁定,我无法再通过Ajax向服务器发出任何请求。 因此,一种解决方案可能是: 忽略用户中止 在后台运行长查询,并让PHP每100毫秒检查一次是否完成 从查询中获取pid 如果用户中止,请杀死该pid 否则返回结果 我不知道该
问题内容: 我看到许多连接处于打开状态,并且很长一段时间(例如5分钟)保持空闲状态。 有什么解决方案可以在不重新启动mysql服务的情况下从服务器终止/关闭它? 我正在维护旧的PHP系统,无法关闭为执行查询而建立的连接。 我应该将my.cnf文件中的超时值减少为默认的8小时吗? 问题答案: 手动清理: 您可以杀死该进程ID。 但: php应用程序可能会报告错误(或网络服务器,请检查错误日志) 不要
我的程序很简单。我有一个有2个方法的类。我有另一个类在那里调用这两个方法。当调用第一个方法时,初始化arraylist并填充一些对象,然后调用第二个方法,简单地告诉arraylist的大小并打印出来。我不明白为什么我总是得到一个空的时候,第二个方法被调用。然而,当我在第一个方法中打印数组时,我得到了大小和所有it元素。Arraylist是公共得..请看一下代码。 在第一班