当前位置: 首页 > 知识库问答 >
问题:

阻塞队列。size()返回发布服务器订阅服务器中的错误大小

温智明
2023-03-14

我对一个发布者-多个订阅者模式的实现有疑问。发布者使用固定大小的缓冲区并将消息排队。消息被发送给所有订户。订阅者获取消息的顺序必须与发布消息的顺序相同。

我使用阻止队列来保存发布者消息(发布者队列)并将其传递给每个订阅者阻止队列(订阅者队列)。

问题是缓冲区和订阅服务器工作正常,但缓冲区大小 (发布者Queue.size()) 始终返回 1。

System.out.println("Actual number of messages in buffer: " + publisherQueue.size());

这是我的完整代码:

PublisherSubscriberService.java

package program;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class PublisherSubscriberService {
    private int buffer;
    private int subscribersNumber;
    static Set<subscriber> subscribers = new HashSet<subscriber>();

    public PublisherSubscriberService(int buffer, int subscribersNumber) {
        this.buffer = buffer;
        this.subscribersNumber = subscribersNumber;
    }

    public void addsubscriber(subscriber subscriber) {
        subscribers.add(subscriber);
    }

    public void start() {
        publisher publisher = new publisher(buffer);
        System.out.println("publisher started the job");

        for (int i = 0; i < subscribersNumber; i++) {
            subscriber subscriber = new subscriber(buffer);
            subscriber.setName(Integer.toString(i + 1));
            subscribers.add(subscriber);
            new Thread(subscriber).start();
            System.out.println("Subscriber " + subscriber.getName() + " started the job");
        }
        new Thread(publisher).start();
    }

    public class Publisher implements Runnable {
        private int buffer;
        final BlockingQueue<Message> publisherQueue;

        public Publisher(int buffer) {
            this.buffer = buffer;
            publisherQueue = new LinkedBlockingQueue<>(buffer);
        }

        @Override
        public void run() {
            for (int i = 1; i < 100; i++) {
                Message messageObject = new Message("" + i);
                try {
                    Thread.sleep(50);
                    publisherQueue.put(messageObject);
                    System.out.println("Queued message no " +         messageObject.getMessage());
                    System.out.println("Actual number of messages in buffer:     " + publisherQueue.size());
                    for (subscriber subscriber : subscribers) {
                        subscriber.subscriberQueue.put(messageObject);
                    }
                    publisherQueue.take();
                } catch (InterruptedException e) {
                    System.out.println("Some error");
                    e.printStackTrace();
                }
            }
        }
    }

    class Subscriber implements Runnable {
        private String name;
        private int buffer;
        final BlockingQueue<Message> subscriberQueue;

        public Subscriber(int buffer) {
            this.buffer = buffer;
            subscriberQueue = new LinkedBlockingQueue<>(buffer);
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {
            try {
                Message messageObject;
                while (true) {
                    Thread.sleep(100);
                    messageObject = subscriberQueue.take();
                    System.out.println(this.getName() + " got message: " + messageObject.getMessage());
                }
            } catch (InterruptedException e) {
                System.out.println("Some error");
                e.printStackTrace();
            }
        }
    }
class Message {
    private String message;

    public Message(String str) {
        this.message = str;
    }

    public String getMessage() {
        return message;
    }

}
}

PublisherSubscriberProgramm.java

    package program;

public class ProducerConsumerProgram {

    public static void main(String[] args) {
        ProducerConsumerService service = new ProducerConsumerService(10, 3);
        service.start();
    }
}

共有1个答案

傅穆冉
2023-03-14

发布者在队列中从未有过 1 个项目。每次通过你的循环,你把和拿一个项目:

                **publisherQueue.put(messageObject);**
                System.out.println("Queued message no " +         messageObject.getMessage());
                System.out.println("Actual number of messages in buffer:     " + publisherQueue.size());
                for (subscriber subscriber : subscribers) {
                    subscriber.subscriberQueue.put(messageObject);
                }
                **publisherQueue.take();**

使用您提供的代码,甚至拥有发布者队列也是有意义的。

 类似资料:
  • 问题内容: 我正在尝试使用Redis Cookbook示例: 我在这里取得了成功,但从未得到“消息”。 我的客户端index.htm是这个 客户如何发布到特定的Redis“聊天”频道。 问题答案: 如果您在node.js程序中使用Redis发布/订阅功能,则应使用一个Redis客户端连接来监听某个频道,使用另一个Redis客户端连接来发送常规命令和/或将消息发布到您的频道。从node_redis文

  • 现在我们已经知道了Java NIO里面那些非阻塞特性是怎么工作的,但是要设计一个非阻塞的服务仍旧比较困难。非阻塞IO相对传统的阻塞IO给开发者带来了更多的挑战。在本节非阻塞服务的讲解中,我们一起来讨论这些会面临的主要挑战,同时也会给出一些潜在的解决方案。 查找关于设计非阻塞服务的相关资料是比较难的,本文提出的解决方案也只能是基于笔者个人的工作经验,构思。如果你有其他的解决方案或者是更好的点子,那么

  • 假设,我有一个服务A,它有一个线程池执行器来调用服务B。我们可以用自己的值设置该池的核心池大小和队列。现在,服务B对请求的响应速度很慢,因为服务A线程池中的活动线程增加,导致阻塞队列大小增加。如何防止服务A的队列大小增加? 背景

  • 问题内容: 这是我在laravel 5中测试的ajax(请参阅下文) 和触发链接 和我的路线 但是当我在google chrome中运行控制台时,它给了我错误,并且未返回预期的响应“在laravel 5中返回’成功!ajax’;” POST http://juliver.laravel.com/test 500(内部服务器错误) 我的代码有什么问题/问题?我缺少什么? 问题答案: 虽然这个问题存在

  • 我有一个方法,可以启动一个简单的cpp grpc服务器。 我想做

  • 主要内容:1 非阻塞服务器-GitHub仓库,2 无阻塞IO管道,3 非阻塞与阻塞IO管道,4 基本的无阻塞IO管道设计,5 读取部分消息,6 存储部分消息,7 编写部分消息,8 总结,9 服务器线程模型即使你了解了Java NIO非阻塞功能如何工作(Selector,Channel, Buffer等),设计一个无阻塞服务器仍然很难。与阻塞IO相比,非阻塞IO包含多个挑战。这份非阻塞服务器教程将讨论非阻塞服务器的主要挑战,并为它们描述一些潜在的解决方案。 本教程中描述的思想是围绕Java NIO