当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ:具有直接绑定的消费者的动态数量

邰英毅
2023-03-14

我有以下场景:

  • 一次生产性服务
  • 消费者服务的动态数量
  • 消息包含特定产品的任务,因此一旦使用者x处理产品Y的消息。将来,x应该处理产品Y的所有消息。理想情况下,生产者服务应该将产品x的所有消息发送到一个队列中,只有消费者x才能从中读取。
  • 为了平均分配工作负载,应该有一种方法,即一旦需要管理新产品,下一个可用使用者就会使用它。(我假设是所有使用者都在读取的队列)

我的方法:

  • exchange发送“NewProduct”队列中的新产品作业,所有使用者都从该队列消费。
  • 读取这样一条消息的使用者y通知生产者服务(在一个单独的队列上),他现在负责产品X。
  • 然后,生产者将产品x的所有消息发送到使用者Y的队列。
  • 当新的使用者服务z联机时,它通知特定队列上的生产者服务他联机了,这样生产者就可以在交换中为z的正确队列创建绑定。

问题:

  • 我的方法是解决问题的好方法吗,还是我缺少了rabbitmq解决方案,它可以以不那么复杂的方式解决问题?
  • 如何在运行时向exchange添加新队列?

共有1个答案

空佐
2023-03-14

交换发送“NewProduct”队列中的新产品作业,所有使用者都从该队列中消费。

我觉得这个不错。

读取这样一条消息的消费者y通知生产者服务(在一个单独的队列上)他现在负责产品X。这也是好的,我猜如果生产者没有收到通知,产品X被照顾,它将需要做一些事情。然后,生产者将产品x的所有消息发送到消费者Y的队列。

我将使用相同的路由密钥发送产品X的所有消息,如product-x。你大概就是这个意思。我会避免告诉producer现在是谁来具体处理产品-X的。为了更好地分离关注点和简单性,生产者应该尽可能少地了解消费者及其队列,反之亦然。

当一个新的消费者服务z上线时,它会通知特定队列上的生产者服务他在线了,这样生产者就可以在交换中为z的正确队列创建绑定。

你可以这样做,但我会用不同的方法

当consumer上线时,它将自己创建所需队列(或订阅现有队列)。

我是这样看的:

  • 使用者联机并订阅newProduct队列。
  • 当收到处理产品Z的消息时:
  • 使用绑定密钥product-z
  • 为自身创建一个新队列
  • 通知生产者产品Z正在处理
  • 生产者开始发送具有路由密钥product-z的消息,这些消息最终进入使用者的队列。

请确保您的消费者具有一些高可用性,否则您可能会出现这样的情况:消费者开始处理某些消息,然后就死了,而producer则继续为现在未处理的产品发送消息。

 类似资料:
  • 我是Guice的新手,正在为以下用例寻求帮助: 我开发了一个软件包(PCKG),其中该软件包的入门级依赖于其他类,例如: 在我的绑定模块中,我正在做: 注意我没有为A提供绑定信息,因为我想通过它的消费者类来提供它的绑定。(设计就是这样,所以我的要求是继续讨论主要问题,而不是设计)。 现在,我的消费者阶层正在做这样的事情: 同样在我的消费者包装中: Q1。我在消费者类中做的绑定正确吗?我很困惑,因为

  • 有以下消费者代码: 然后我用脚本生成消息: 问题是,当我将消费者作为两个不同的进程启动时,我会在每个进程中收到新消息。但是,我希望它只发送给一个消费者,而不是广播。 在Kafka的文献中(https://kafka.apache.org/documentation.html)其中写道: 如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。 我发现这些消费者的

  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程

  • 问题内容: 我正在尝试创建一个数据库查询函数,该函数可以采用多个参数并可以在其他地方重用,但是我尝试了许多与我的方法类似的在线方法,但它们无法按预期工作。 我希望能够使用此单个函数随时随地运行查询,使用PDO驱动程序这种类型的函数要容易得多,因为您只需在内部输入绑定即可,但是在这种情况下,我不得不立即将MySQLi用作应用程序依赖它,但想要升级它以使用准备好的语句。 我需要如何使用该函数以确保其可

  • 我使用的是运行在AWS中的spark独立集群(spark and spark-streaming-kafka version 1.6.1),并对检查点目录使用S3桶,每个工作节点上没有调度延迟和足够的磁盘空间。 没有更改任何Kafka客户端初始化参数,非常肯定Kafka的结构没有更改: 也不明白为什么当直接使用者描述说时,我仍然需要在创建流上下文时使用检查点目录?

  • 文件描述符:256个可用套接字描述符:138个可用 显然我不能打开超过138个连接。 问题1:这个限制是基于什么?我能提高它吗?我想知道在生产机器上(需要哪种EC2实例),每个用户有一个连接是否是一个好主意。我读到过限制可能与ulimit有关,但当我在命令行上运行ulimit时,我看到的是'unlimited'。 还有什么其他的策略?