当前位置: 首页 > 知识库问答 >
问题:

如何在直接交换中与多个消费者确认消息

霍修筠
2023-03-14

  1. 发布者创建reply_to队列并发布到路由密钥,其中包含一条消息,告诉消费者向队列发送响应(RPC协议),以及一个传回的相关id,以便所有未来的结果都与该唯一标识符相关联
  2. Exchange向绑定到该路由密钥的所有队列发送消息。这里,有两个消费者的两个队列,每个都绑定到路由密钥“泵”
  3. 一段时间后,消费者回复回reply_to队列,然后确认消息,以便他们的唯一队列删除发送到其队列的消息。每个收到消息的消费者都会这样做。
  4. 代理将响应发送到RPC队列。发布者确认它收到的每条消息,确认它收到的消息

我知道这很令人困惑。。基本上可以归结为一个问题——信息与什么有关?这在循环赛中是显而易见的。每条消息被发送到一个队列,消费者可以确认它;然而,如果同一条消息有多个消费者,我觉得每个队列(以及绑定到它的每个消费者)都有自己的消息发送给消费者,每个消息都必须得到确认。是这样吗?

RabbitMQ有这样一句话:

https://www.rabbitmq.com/confirms.html#acknowledgement-模式根据使用的确认模式,RabbitMQ可以认为消息在发送(写入TCP套接字)后立即成功传递,或者在收到显式(“手动”)客户端确认时成功传递。

不幸的是,这并没有提到队列,以及当有多个队列和它们自己的消费者时会发生什么。

共有1个答案

祝英博
2023-03-14

使用RabbitMQ,对于真正的扇出方法,每个使用者最好将自己的队列绑定到exchange,这样每个使用者都将在不影响其他使用者的情况下接收和使用消息

例如,销售代表向收银员发送订单,其中有多个销售代表和多个收银员。

销售代表发送订单

Channel.ExchangeDeclare(exchange: "cashieradd", ExchangeType.Fanout);
                var jsonResult = JsonConvert.SerializeObject(new CashierQueue()
                {
                    transactionId = transactionId
                });
                var body = Encoding.UTF8.GetBytes(jsonResult);

                Channel.BasicPublish(exchange: "cashieradd", routingKey: "", basicProperties: null, body: body);

每个出纳员都会订阅交易所

        {
            var cashierAddQueue = Channel.QueueDeclare().QueueName;
            Channel.QueueBind(queue: cashierAddQueue, exchange: "cashieradd", routingKey: "");
            var consumer = new EventingBasicConsumer(Channel);
            Channel.BasicConsume(queue: cashierAddQueue, autoAck: true, consumer: consumer);
            return consumer;
        }

这使用了RabbitMQ动态队列,但是,消费者独有的任何队列都将具有相同的效果。

这里不一定需要例程键

 类似资料:
  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 问题内容: 我是一名学习Kafka的新学生,在了解多个消费者(到目前为止,文章,文档等对他们没有太大帮助)方面,我遇到了一些基本问题。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,将100条简单消息发布到某个主题,然后让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入另一个使用者来使用与刚刚发布消息的主题相同的主题时,它没有收到消息。 据我了解,对于每个主题

  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#

  • 我运行生产者,它生成N条消息,我在仪表板上看到它们。当我运行接收器时,它会接收来自队列的所有消息,并且队列为空。 我需要有多个生产者生成消息到同一个队列。多个客户从队列中接收消息。消息将被队列TTL删除。但是现在第一个接收者从队列中获取所有消息。我怎么能做到这一点?