当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ-主题交换-两个或多个消费者的相同主题

宇文峰
2023-03-14

我是AMQP的新手,正在尝试为RabbitMQ系统制定一个通知架构。

我想要一个主题交换(通知交换,比方说),特别是因为我想灵活地使用主题交换附带的路由密钥和队列,以及将来扩展该主题的更多选项。不过,我可能是错的,因为...

我还想让两个或更多的消费者使用每个通知。作为基线,我希望发布的每个通知都在数据库中结束。此外,我希望每个通知都可以由客户端应用程序使用(例如,web应用程序使用并进一步通过套接字发送即时用户通知,而无需数据库轮询)。

这听起来真的像是一个扇形的情况,除了我不想这样做,因为我需要更多的队列来处理各种通知(我认为 - AMQP仍然是新的,并试图围绕它)。

是否有可能让两个使用者(一致地)从同一队列获得通知?

例如:

  • 推送<code>Notif.NotifGroup.User。此用户通过通知交换
  • 将<code>dbListener
  • 让<code>mvcClientListener(并进一步确定用户是否在线并通过套接字向下游推送)

我不确定我是否在正确的轨道上。我读到“同一队列的多个消费者以循环方式进行负载平衡”,坦率地说,我不知道这是什么意思。

是否有可能有一个主题交换,其中两个使用者可以一致地从同一队列中读取相同的消息(例如,相同的路由密钥),或者我必须为此使用Fanout Exchange?

谢了。

共有1个答案

燕建中
2023-03-14

是否有可能让两个使用者(一致地)从同一队列获得通知?

是的,但这不是你想要的。

当一个队列上有多个使用者时,这些使用者是轮循机制负载平衡的。如果使用者 1 收到消息 a,则使用者 2 无法接收它

相反,您要做的是为每个消费者添加额外的路由处理程序,并让每个消费者创建自己的队列。

例如,NotificationEx 可能有一个 Notif.# 的绑定,它推送到 dbQueue。同一个交换将有一个 Notif.# 绑定,它将消息推送到 mvcQueue

您的数据库消费者将从<code>dbQueue

这使得添加新的消费者变得非常容易,每个消费者都有自己的队列并绑定到交换。

FWIW,对于任何给定的情况,没有“正确”或“错误”的交换类型。您可以让任何交换类型适用于任何情况。然而,一些交换类型使得处理某些情况更容易。

您可能想查看我在RabbitMQ上的电子书,以了解有关交换类型、它们普遍接受的“最佳用例”以及它们如何应用于相当有趣的场景的更多信息:https://leanpub.com/b/rmq-layout-and-patterns

 类似资料:
  • 我成功地建立了一个话题交换,并且能够同时向几个消费者传递消息。 我还想向竞争对手传递信息,并继续使用主题交换。我了解到,使用相同的队列名称可以让消费者竞争消息。然而,我可能弄错了,因为我无法使它工作。 为同一主题的多个侦听器设置: < li >申报话题交流 < li >对于每个侦听器,用自动生成的名称声明一个新队列 < li >用给定的主题路由关键字将此队列绑定到上面的交换 如何将相互竞争的消费者

  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

  • 问题内容: 我有一个主题列表(目前为10个),其规模将来可能会增加。我知道我们可以在每个主题中产生多个线程(每个主题)使用,但是就我而言,如果主题数量增加,那么从主题中使用的线程数量就会增加,这是我不希望的,因为主题不是太频繁地获取数据,因此线程将处于理想状态。 有没有办法让一个消费者从所有主题中消费?如果是,那我们如何实现呢?另外,Kafka将如何维护偏移量?请提出答案。 问题答案: 我们可以使

  • 在我的Spring Boot Kafka应用程序中,我有以下使用者配置: 消费者: 如果我理解正确的话,现在我有一个消费者的实例。我想增加post消费者的数量,假设有5个消费者将消费来自${kafka.topic.post.send}的不同(不同)消息,以加快消息消费。 它是否像添加工厂一样简单。setConcurrency(5) 至我的PostKafkAlisterContainerFactor

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?