当前位置: 首页 > 知识库问答 >
问题:

发布者/订阅者模式的并发实现

柳和怡
2023-03-14

我想用Java实现各种各样的发布者/订阅者模式,但目前已经没有主意了。

有1个发布者和N个订阅者,发布者发布对象,然后每个订阅者需要按照正确的顺序对每个对象进行一次且仅处理一次。发布者和每个订阅者在自己的线程中运行。

在我最初的实现中,每个订阅者都有自己的阻塞队列,发布者将对象放入每个订阅者的队列中。这可以正常工作,但如果任何订阅者的队列已满,发布者将被阻塞。这会导致性能下降,因为每个订阅者处理对象的时间不同。

然后在另一个实现中,发布者将该对象保存在其自己的队列中。除了对象之外,还有一个AtomicInteger计数器与它相关联,其中包含了订阅者的数量。然后,每个订户查看队列并减少计数器,并在计数器达到零时将其从队列中移除。

通过这种方式,发布者免于阻塞,但现在订阅者需要等待彼此处理对象,从队列中删除对象,然后才能查看下一个对象。

有什么更好的方法吗?我认为这应该是一种很常见的模式。

共有3个答案

习洲
2023-03-14

有1个发布者和N个订阅者,发布者发布对象,然后每个订阅者需要按照正确的顺序对每个对象进行一次且仅处理一次。发布者和每个订阅者在自己的线程中运行。

我会改变这个建筑。我最初考虑了每个用户的队列,但我不喜欢这种机制。例如,如果第一个订阅者需要更长的时间来运行,那么所有的作业都将在该队列中结束,您将只执行一个线程的工作。

由于您必须按顺序运行订阅者,因此我有一个线程池,通过所有订阅者运行每条消息。对订阅者的调用需要是可重入的,这可能是不可能的。

因此,您将有一个包含 10 个线程的池(假设),每个线程都从发布者的队列中取消排队,并执行如下操作:

public void run() {
    while (!shutdown && !Thread.currentThread().isInterrupted()) {
        Article article = publisherQueue.take();
        for (Subscriber subscriber : subscriberList) {
           subscriber.process(article);
        }
    }
}
禄和宜
2023-03-14

毫无疑问

每个订阅者都有自己的阻塞队列,发布者将对象放入每个订阅者的队列

这是要走的路。您可以使用线程方法将其放入队列…所以如果一个队列已满,发布者不会等待…

s1 s2 s3是订阅者,addToQueue是每个订阅者中添加到腐蚀队列的方法。addQueue方法是等待队列非空…所以调用addQueue将是阻塞调用理想的同步code

然后,在publisher中,您可以执行类似以下代码的操作

注意:代码可能未处于正常工作状态。但应该给你想法。

List<subscriber> slist;// Assume its initialised
public void publish(final String message){

    for (final subscriber s: slist){


          Thread t=new Thread(new Runnable(){
             public void run(){
                s.addToQueue(message);
             }
           });

      t.start();
    }

}
澹台庆
2023-03-14

您的“多队列”实现是正确的。我认为您不必担心一个完整的队列阻塞了生产者,因为完成的总时间不会受到影响。假设你有三个消费者,两个以每秒1的速率消费,第三个以每五秒1的速率消费,同时生产者以每两秒1的速率生产。最终,第三个队列会被填满,因此生成器会阻塞它,并停止将项目放入第一个和第二个队列。有很多方法可以解决这个问题,但它们不会改变第三消费者永远是瓶颈的事实。如果您正在生产/消费100个项目,那么由于第三个消费者的原因,这将至少需要500秒(5秒乘以100个项目),即使第一个和第二个消费者在200秒后完成(因为您已经做了一些聪明的事情,允许生产者在第三个队列已满后继续填充他们的队列),或者如果他们在500秒后完成(因为生产者在第三个队列上阻塞),也会出现这种情况。

 类似资料:
  • 本文向大家介绍JavaScript设计模式之观察者模式(发布者-订阅者模式),包括了JavaScript设计模式之观察者模式(发布者-订阅者模式)的使用技巧和注意事项,需要的朋友参考一下 观察者模式( 又叫发布者-订阅者模式 )应该是最常用的模式之一. 在很多语言里都得到大量应用. 包括我们平时接触的dom事件. 也是js和dom之间实现的一种观察者模式. 只要订阅了div的click事件. 当点

  • 我需要在发布/订阅模式下调用Kafka消费者1000次。据我所知,为了让kafka在发布/订阅模式下工作,我需要给每个消费者一个新的groupId(props . put(" group . id ",String.valueOf(Instant.now())。toEpochMilli()));).但是当我这样做的时候,如果两个消费线程同时访问消费线程,就会出现问题。这个问题应该怎么解决?

  • 本文向大家介绍JS模式之简单的订阅者和发布者模式完整实例,包括了JS模式之简单的订阅者和发布者模式完整实例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了JS模式之简单的订阅者和发布者模式。分享给大家供大家参考。具体如下: 希望本文所述对大家的javascript程序设计有所帮助。

  • 目前,我已经开始使用ActiveMQ处理JMS主题。我已经通过JAVA代码(如下所述)创建了发布者和持久订阅者,并且在订阅者端也收到了消息。 Publisher.Java 订阅者.java 我对以下主题有一些疑问, 如何检查有多少订阅者使用 Java JMS 在主题中主动查找消息? 如何从主题中获取活动和持久订阅者列表? 我们是否可以删除主题中发布的消息? 在这些情况下帮助我。 提前致谢。

  • 我正在创建一个API来发布和使用来自RabbitMQ的消息。在我当前的设计中,发布者将发出一个POST请求。我的API会将POST请求路由到Exchange。这样,发布者在发布时就不必知道服务器地址、exchange名称等。 现在消费者部分是我不确定如何继续的地方。 开始时不会排队。当一个新的消费者想要订阅一个主题时,那么我将创建一个队列并将其绑定到交换。我需要一些问题的答案- 一旦我为使用者创建

  • Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。 Redis 客户端可以订阅任意数量的频道。 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2、client5 和 client1 之间的关系: 当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个