当前位置: 首页 > 知识库问答 >
问题:

blockingqueue为空时终止使用者

陶瀚玥
2023-03-14

我正在阅读blockingqueue、executoreserivce和producer-consumer范式。我希望有一个不断变化的生产者和不断变化的消费者。每个生产者将追加到队列中,消费者将消费消息并处理它们。我有的问题是--生产者怎么知道消费者做了,没有更多的消息会进入队列?我想在主线程中添加一个计数器。当一个生产者启动时,我将增加计数器,当每个生产者结束时,他们将减少int。我的消费者将能够知道计数器,当它达到0,并且队列中没有更多的元素时,他们可以死亡。

在同步工作方面的另一个常见问题-主线程是否应该读取队列的内容,并为每个消息添加执行器,或者让线程知道这个逻辑并自行决定何时结束是最佳实践?

当系统启动时,我收到一个数字,决定有多少生产者将启动。每个生产者将生成一组随机的数字到队列中。消费者将把这些数字打印到日志中。我现在的问题是,一旦我知道最后一个生产商把最后一个数字推进来了,我还是不明白如何让消费者知道不会有更多的数字进来,他们应该关闭。

消费者怎么知道生产者做了什么?

共有1个答案

程皓轩
2023-03-14

这个问题的一个很好的解决方案是使用PoisonPill模式。这里有一个例子说明它是如何工作的。在这种情况下,您需要知道的只是制作者的数量。

编辑:我更新了代码,以便在最后一个使用者完成工作时清除队列。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class PoisonPillsTests {

    interface Message {

    }

    interface PoisonPill extends Message {
        PoisonPill INSTANCE = new PoisonPill() {
        };
    }

    static class TextMessage implements Message {

        private final String text;

        public TextMessage(String text) {
            this.text = text;
        }

        public String getText() {
            return text;
        }

        @Override
        public String toString() {
            return text;
        }
    }

    static class Producer implements Runnable {

        private final String producerName;
        private final AtomicInteger producersCount;
        private final BlockingQueue<Message> messageBlockingQueue;

        public Producer(String producerName, BlockingQueue<Message> messageBlockingQueue, AtomicInteger producersCount) {
            this.producerName = producerName;
            this.messageBlockingQueue = messageBlockingQueue;
            this.producersCount = producersCount;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    messageBlockingQueue.put(new TextMessage("Producer " + producerName + " message " + i));
                }
                if (producersCount.decrementAndGet() <= 0) {
                    //we need this producersCount so that the producers to produce a single poison pill
                    messageBlockingQueue.put(PoisonPill.INSTANCE);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Producer interrupted", e);
            }
        }
    }

    static class Consumer implements Runnable {

        private final AtomicInteger consumersCount;
        private final AtomicInteger consumedMessages;
        private final BlockingQueue<Message> messageBlockingQueue;

        public Consumer(BlockingQueue<Message> messageBlockingQueue, AtomicInteger consumersCount, AtomicInteger consumedMessages) {
            this.messageBlockingQueue = messageBlockingQueue;
            this.consumersCount = consumersCount;
            this.consumedMessages = consumedMessages;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Message message = null;
                    message = messageBlockingQueue.take();

                    if (message instanceof PoisonPill) {
                        //we put back the poison pill so that to be consumed by the next consumer
                        messageBlockingQueue.put(message);
                        break;
                    } else {
                        consumedMessages.incrementAndGet();
                        System.out.println("Consumer got message " + message);
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Consumer interrupted", e);
            } finally {
                if (consumersCount.decrementAndGet() <= 0) {
                    System.out.println("Last consumer, clearing the queue");
                    messageBlockingQueue.clear();
                }
            }
        }
    }

    public static void main(String[] args) {

        final AtomicInteger producerCount = new AtomicInteger(4);
        final AtomicInteger consumersCount = new AtomicInteger(2);
        final AtomicInteger consumedMessages = new AtomicInteger();
        BlockingQueue<Message> messageBlockingQueue = new LinkedBlockingQueue<>();


        List<CompletableFuture<Void>> tasks = new ArrayList<>();
        for (int i = 0; i < producerCount.get(); i++) {
            tasks.add(CompletableFuture.runAsync(new Producer("" + (i + 1), messageBlockingQueue, producerCount)));
        }

        for (int i = 0; i < consumersCount.get(); i++) {
            tasks.add(CompletableFuture.runAsync(new Consumer(messageBlockingQueue, consumersCount, consumedMessages)));
        }

        CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();

        System.out.println("Consumed " + consumedMessages + " messages");

    }
}
 类似资料:
  • 问题内容: 我正在寻找一种方法来阻止直到a 为空。 我知道,在多线程环境中,只要有生产者将项目放入,就可能会出现队列变空并且几秒钟后队列中充满项目的情况。 但是,如果只有 一个 生产者,则它可能要等待(并阻止)直到队列停止为空后再将其放入队列。 Java /伪代码: 你有什么主意吗? 编辑 :我知道包装和使用额外的条件可以解决问题,我只是问是否有一些预制的解决方案和/或更好的选择。 问题答案: 使

  • java.util.concurrent.BlockingQueue接口是Queue接口的子接口,并且还支持诸如在检索元素之前等待队列变为非空的操作,并在存储元素之前等待队列中的空间可用。 BlockingQueue方法 Sr.No. 方法和描述 1 boolean add(E e) 如果可以在不违反容量限制的情况下立即执行此操作,则将指定的元素插入此队列,成功时返回true,如果当前没有可用空间

  • 问题内容: 我正在使用SqlBulkCopy从xml备份还原表。其中一个表备份约为200MB,并且有很多记录。 我有错误: 问题答案: 您可能需要增加超时时间。尝试从默认值30秒增加该值。

  • 问题内容: 从PHP运行长查询时, [如果用户在浏览器中按Stop,我如何杀死查询? 考虑到我不能调用任何其他PHP函数,因为在等待MySQL时PHP被阻止了。 另外,由于会话锁定,我无法再通过Ajax向服务器发出任何请求。 因此,一种解决方案可能是: 忽略用户中止 在后台运行长查询,并让PHP每100毫秒检查一次是否完成 从查询中获取pid 如果用户中止,请杀死该pid 否则返回结果 我不知道该

  • 问题内容: 我看到许多连接处于打开状态,并且很长一段时间(例如5分钟)保持空闲状态。 有什么解决方案可以在不重新启动mysql服务的情况下从服务器终止/关闭它? 我正在维护旧的PHP系统,无法关闭为执行查询而建立的连接。 我应该将my.cnf文件中的超时值减少为默认的8小时吗? 问题答案: 手动清理: 您可以杀死该进程ID。 但: php应用程序可能会报告错误(或网络服务器,请检查错误日志) 不要

  • 我的程序很简单。我有一个有2个方法的类。我有另一个类在那里调用这两个方法。当调用第一个方法时,初始化arraylist并填充一些对象,然后调用第二个方法,简单地告诉arraylist的大小并打印出来。我不明白为什么我总是得到一个空的时候,第二个方法被调用。然而,当我在第一个方法中打印数组时,我得到了大小和所有it元素。Arraylist是公共得..请看一下代码。 在第一班